From 3206da5129969a7bc57f618a72a369cd6dca1c6b Mon Sep 17 00:00:00 2001 From: Age Manning Date: Mon, 24 Aug 2020 23:53:13 +1000 Subject: [PATCH] Gossipsub 1.1 ammendments (#50) * Shuffling things around * Further minor ammendments * Misc typos and improvements * Correct tests * Wasm support --- examples/gossipsub-chat.rs | 10 +- examples/ipfs-private.rs | 8 +- protocols/gossipsub/src/behaviour.rs | 447 +++++++++++-------- protocols/gossipsub/src/behaviour/tests.rs | 71 +-- protocols/gossipsub/src/config.rs | 32 +- protocols/gossipsub/src/gossip_promises.rs | 36 +- protocols/gossipsub/src/lib.rs | 4 +- protocols/gossipsub/src/peer_score/mod.rs | 24 +- protocols/gossipsub/src/peer_score/params.rs | 141 +++--- protocols/gossipsub/src/peer_score/tests.rs | 20 + protocols/gossipsub/src/protocol.rs | 3 +- protocols/gossipsub/src/time_cache.rs | 20 + protocols/gossipsub/src/types.rs | 32 ++ protocols/gossipsub/tests/smoke.rs | 2 +- 14 files changed, 532 insertions(+), 318 deletions(-) diff --git a/examples/gossipsub-chat.rs b/examples/gossipsub-chat.rs index 8cf77e6578e..946e8727c14 100644 --- a/examples/gossipsub-chat.rs +++ b/examples/gossipsub-chat.rs @@ -93,10 +93,12 @@ fn main() -> Result<(), Box> { .heartbeat_interval(Duration::from_secs(10)) .message_id_fn(message_id_fn) // content-address messages. No two messages of the //same content will be propagated. - .build().expect("Valid config"); + .build() + .expect("Valid config"); // build a gossipsub network behaviour let mut gossipsub = - gossipsub::Gossipsub::new(MessageAuthenticity::Signed(local_key), gossipsub_config); + gossipsub::Gossipsub::new(MessageAuthenticity::Signed(local_key), gossipsub_config) + .expect("Correct configuration"); gossipsub.subscribe(topic.clone()); if let Some(explicit) = std::env::args().nth(2) { let explicit = explicit.clone(); @@ -142,10 +144,10 @@ fn main() -> Result<(), Box> { loop { match swarm.poll_next_unpin(cx) { Poll::Ready(Some(gossip_event)) => match gossip_event { - GossipsubEvent::Message{ + GossipsubEvent::Message { propagation_source: peer_id, message_id: id, - message + message, } => println!( "Got message: {} with id: {} from peer: {:?}", String::from_utf8_lossy(&message.data), diff --git a/examples/ipfs-private.rs b/examples/ipfs-private.rs index e0324427a54..82ce9e189b4 100644 --- a/examples/ipfs-private.rs +++ b/examples/ipfs-private.rs @@ -192,7 +192,7 @@ fn main() -> Result<(), Box> { GossipsubEvent::Message { propagation_source: peer_id, message_id: id, - message + message, } => println!( "Got message: {} with id: {} from peer: {:?}", String::from_utf8_lossy(&message.data), @@ -245,12 +245,14 @@ fn main() -> Result<(), Box> { let mut swarm = { let gossipsub_config = GossipsubConfigBuilder::new() .max_transmit_size(262144) - .build().expect("valid config"); + .build() + .expect("valid config"); let mut behaviour = MyBehaviour { gossipsub: Gossipsub::new( MessageAuthenticity::Signed(local_key.clone()), gossipsub_config, - ), + ) + .expect("Valid configuration"), identify: Identify::new( "/ipfs/0.1.0".into(), "rust-ipfs-example".into(), diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 58946806a42..94fc32184f4 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -59,7 +59,7 @@ use crate::time_cache::DuplicateCache; use crate::topic::{Hasher, Topic, TopicHash}; use crate::types::{ GossipsubControlAction, GossipsubMessage, GossipsubSubscription, GossipsubSubscriptionAction, - MessageId, PeerInfo, + MessageAcceptance, MessageId, PeerInfo, }; use crate::types::{GossipsubRpc, PeerKind}; use crate::{rpc_proto, TopicScoreParams}; @@ -67,36 +67,6 @@ use std::cmp::Ordering::Equal; mod tests; -/// Event that can happen on the gossipsub behaviour. -#[derive(Debug)] -pub enum GossipsubEvent { - /// A message has been received. - Message { - /// The peer that forwarded us this message. - propagation_source: PeerId, - /// The `MessageId` of the message. This should be referenced by the application when - /// validating a message (if required). - message_id: MessageId, - /// The message itself. - message: GossipsubMessage, - }, - /// A remote subscribed to a topic. - Subscribed { - /// Remote that has subscribed. - peer_id: PeerId, - /// The topic it has subscribed to. - topic: TopicHash, - }, - - /// A remote unsubscribed from a topic. - Unsubscribed { - /// Remote that has unsubscribed. - peer_id: PeerId, - /// The topic it has subscribed from. - topic: TopicHash, - }, -} - /// Determines if published messages should be signed or not. /// /// Without signing, a number of privacy preserving modes can be selected. @@ -132,14 +102,14 @@ pub enum MessageAuthenticity { impl MessageAuthenticity { /// Returns true if signing is enabled. - fn is_signing(&self) -> bool { + pub fn is_signing(&self) -> bool { match self { MessageAuthenticity::Signed(_) => true, _ => false, } } - fn is_anonymous(&self) -> bool { + pub fn is_anonymous(&self) -> bool { match self { MessageAuthenticity::Anonymous => true, _ => false, @@ -147,6 +117,36 @@ impl MessageAuthenticity { } } +/// Event that can happen on the gossipsub behaviour. +#[derive(Debug)] +pub enum GossipsubEvent { + /// A message has been received. + Message { + /// The peer that forwarded us this message. + propagation_source: PeerId, + /// The `MessageId` of the message. This should be referenced by the application when + /// validating a message (if required). + message_id: MessageId, + /// The message itself. + message: GossipsubMessage, + }, + /// A remote subscribed to a topic. + Subscribed { + /// Remote that has subscribed. + peer_id: PeerId, + /// The topic it has subscribed to. + topic: TopicHash, + }, + + /// A remote unsubscribed from a topic. + Unsubscribed { + /// Remote that has unsubscribed. + peer_id: PeerId, + /// The topic it has subscribed from. + topic: TopicHash, + }, +} + /// A data structure for storing configuration for publishing messages. See [`MessageAuthenticity`] /// for further details. enum PublishConfig { @@ -198,17 +198,18 @@ impl From for PublishConfig { } } -/// Stores backoffs in an efficient manner +/// Stores backoffs in an efficient manner. struct BackoffStorage { - ///stores backoffs and the index in backoffs_by_heartbeat per peer per topic + /// Stores backoffs and the index in backoffs_by_heartbeat per peer per topic. backoffs: HashMap>, - /// stores peer topic pairs per heartbeat (this is cyclic the current index is heartbeat_index) + /// Stores peer topic pairs per heartbeat (this is cyclic the current index is + /// heartbeat_index). backoffs_by_heartbeat: Vec>, - /// the index in the backoffs_by_heartbeat vector corresponding to the current heartbeat + /// The index in the backoffs_by_heartbeat vector corresponding to the current heartbeat. heartbeat_index: usize, - /// the heartbeat interval duration from the config + /// The heartbeat interval duration from the config. heartbeat_interval: Duration, - /// backoff_slack from config + /// Backoff slack from the config. backoff_slack: u32, } @@ -223,7 +224,7 @@ impl BackoffStorage { heartbeat_interval: Duration, backoff_slack: u32, ) -> BackoffStorage { - //we add one additional slot for partial heartbeat + // We add one additional slot for partial heartbeat let max_heartbeats = Self::heartbeats(prune_backoff, &heartbeat_interval) + backoff_slack as usize + 1; BackoffStorage { @@ -235,8 +236,8 @@ impl BackoffStorage { } } - /// Updates the backoff for a peer (if there is already a more restrictive backup this call - /// doesn't change anything) + /// Updates the backoff for a peer (if there is already a more restrictive backoff than this call + /// doesn't change anything). pub fn update_backoff(&mut self, topic: &TopicHash, peer: &PeerId, time: Duration) { let instant = Instant::now() + time; let insert_into_backoffs_by_heartbeat = @@ -316,7 +317,7 @@ impl BackoffStorage { /// Applies a heartbeat. That should be called regularly in intervals of length /// `heartbeat_interval`. pub fn heartbeat(&mut self) { - //clean up backoffs_by_heartbeat + // Clean up backoffs_by_heartbeat if let Some(s) = self.backoffs_by_heartbeat.get_mut(self.heartbeat_index) { let backoffs = &mut self.backoffs; let slack = self.heartbeat_interval * self.backoff_slack; @@ -339,22 +340,11 @@ impl BackoffStorage { }); } - //increase heartbeat index + // Increase heartbeat index self.heartbeat_index = (self.heartbeat_index + 1) % self.backoffs_by_heartbeat.len(); } } -#[derive(Debug)] -pub enum MessageAcceptance { - /// The message is considered valid, and it should be delivered and forwarded to the network - Accept, - /// The message is considered invalid, and it should be rejected and trigger the P₄ penalty - Reject, - /// The message is neither delivered nor forwarded to the network, but the router does not - /// trigger the P₄ penalty. - Ignore, -} - /// Network behaviour that handles the gossipsub protocol. /// /// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`GossipsubConfig`] instance. If message signing is @@ -377,7 +367,8 @@ pub struct Gossipsub { /// duplicates from being propagated to the application and on the network. duplication_cache: DuplicateCache, - /// maps peer ids to its protocol kind + /// A map of peers to their protocol kind. This is to identify different kinds of gossipsub + /// peers. peer_protocols: HashMap, /// A map of all connected peers - A map of topic hash to a list of gossipsub peer Ids. @@ -413,37 +404,40 @@ pub struct Gossipsub { /// We remember all peers we found through peer exchange, since those peers are not considered /// as safe as randomly discovered outbound peers. This behaviour diverges from the go - /// implementation to avoid possible love bombing attacks in px. When disconnecting peers will + /// implementation to avoid possible love bombing attacks in PX. When disconnecting peers will /// be removed from this list which may result in a true outbound rediscovery. px_peers: HashSet, /// Set of connected outbound peers (we only consider true outbound peers found through - /// discovery and not by px) + /// discovery and not by PX). outbound_peers: HashSet, /// Stores optional peer score data together with thresholds, decay interval and gossip /// promises. peer_score: Option<(PeerScore, PeerScoreThresholds, Interval, GossipPromises)>, - /// Counts the number of ihave received from each peer since the last heartbeat + /// Counts the number of `IHAVE` received from each peer since the last heartbeat. count_peer_have: HashMap, - /// Counts the number of iwant that we sent the each peer since the last heartbeat + /// Counts the number of `IWANT` that we sent the each peer since the last heartbeat. count_iasked: HashMap, } impl Gossipsub { /// Creates a `Gossipsub` struct given a set of parameters specified via a `GossipsubConfig`. - pub fn new(privacy: MessageAuthenticity, config: GossipsubConfig) -> Self { + pub fn new( + privacy: MessageAuthenticity, + config: GossipsubConfig, + ) -> Result { // Set up the router given the configuration settings. // We do not allow configurations where a published message would also be rejected if it // were received locally. - validate_config(&privacy, &config.validation_mode()); + validate_config(&privacy, &config.validation_mode())?; // Set up message publishing parameters. - Gossipsub { + Ok(Gossipsub { events: VecDeque::new(), control_pool: HashMap::new(), publish_config: privacy.into(), @@ -476,7 +470,7 @@ impl Gossipsub { count_peer_have: HashMap::new(), count_iasked: HashMap::new(), peer_protocols: HashMap::new(), - } + }) } /// Subscribe to a topic. @@ -597,7 +591,7 @@ impl Gossipsub { for topic_hash in &message.topics { if let Some(set) = self.topic_peers.get(&topic_hash) { if self.config.flood_publish() { - //forward to all peers above score + // Forward to all peers above score and all explicit peers recipient_peers.extend( set.iter() .filter(|p| { @@ -609,14 +603,14 @@ impl Gossipsub { continue; } - //explicit peers + // Explicit peers for peer in &self.explicit_peers { if set.contains(peer) { recipient_peers.insert(peer.clone()); } } - //floodsub peers + // Floodsub peers for (peer, kind) in &self.peer_protocols { if kind == &PeerKind::Floodsub && !self @@ -627,17 +621,16 @@ impl Gossipsub { } } - //gossipsub peers + // Gossipsub peers if self.mesh.get(&topic_hash).is_none() { debug!("Topic: {:?} not in the mesh", topic_hash); - // Build a list of peers to forward the message to - // if we have fanout peers add them to the map. + // If we have fanout peers add them to the map. if self.fanout.contains_key(&topic_hash) { for peer in self.fanout.get(&topic_hash).expect("Topic must exist") { recipient_peers.insert(peer.clone()); } } else { - // we have no fanout peers, select mesh_n of them and add them to the fanout + // We have no fanout peers, select mesh_n of them and add them to the fanout let mesh_n = self.config.mesh_n(); let new_peers = Self::get_random_peers( &self.topic_peers, @@ -648,19 +641,19 @@ impl Gossipsub { |p| { !self.explicit_peers.contains(p) && !self - .score_below_threshold(p, |ts| ts.publish_threshold) + .score_below_threshold(p, |pst| pst.publish_threshold) .0 } }, ); - // add the new peers to the fanout and recipient peers + // Add the new peers to the fanout and recipient peers self.fanout.insert(topic_hash.clone(), new_peers.clone()); for peer in new_peers { debug!("Peer added to fanout: {:?}", peer); recipient_peers.insert(peer.clone()); } } - // we are publishing to fanout peers - update the time we published + // We are publishing to fanout peers - update the time we published self.fanout_last_pub .insert(topic_hash.clone(), Instant::now()); } @@ -732,7 +725,7 @@ impl Gossipsub { }; if let Some(message) = self.mcache.remove(message_id) { - //tell peer_score about reject + // Tell peer_score about reject if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.reject_message(propagation_source, &message, reject_reason); } @@ -762,7 +755,7 @@ impl Gossipsub { /// Activates the peer scoring system with the given parameters. This will reset all scores /// if there was already another peer scoring system activated. Returns an error if the - /// params where not valid. + /// params are not valid. pub fn with_peer_score( &mut self, params: PeerScoreParams, @@ -777,9 +770,19 @@ impl Gossipsub { Ok(()) } - pub fn set_topic_params(&mut self, topic_hash: TopicHash, params: TopicScoreParams) { + /// Sets scoring parameters for a topic. + /// + /// The `with_peer_score()` must first be called to initialise peer scoring. + pub fn set_topic_params( + &mut self, + topic: Topic, + params: TopicScoreParams, + ) -> Result<(), &'static str> { if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.set_topic_params(topic_hash, params); + peer_score.set_topic_params(topic.hash(), params); + Ok(()) + } else { + Err("Peer score must be initialised with `with_peer_score()`") } } @@ -812,7 +815,8 @@ impl Gossipsub { "JOIN: Removing peers from the fanout for topic: {:?}", topic_hash ); - //remove explicit peers and peers with negative scores + + // remove explicit peers and peers with negative scores peers = peers .into_iter() .filter(|p| { @@ -820,8 +824,8 @@ impl Gossipsub { }) .collect(); - // add up to mesh_n of them them to the mesh - // Note: These aren't randomly added, currently FIFO + // Add up to mesh_n of them them to the mesh + // NOTE: These aren't randomly added, currently FIFO let add_peers = std::cmp::min(peers.len(), self.config.mesh_n()); debug!( "JOIN: Adding {:?} peers from the fanout for topic: {:?}", @@ -880,6 +884,7 @@ impl Gossipsub { debug!("Completed JOIN for topic: {:?}", topic_hash); } + /// Creates a PRUNE gossipsub action. fn make_prune( &mut self, topic_hash: &TopicHash, @@ -890,16 +895,25 @@ impl Gossipsub { peer_score.prune(peer, topic_hash.clone()); } - if let Some(PeerKind::Gossipsub) = self.peer_protocols.get(peer) { - // GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway - return GossipsubControlAction::Prune { - topic_hash: topic_hash.clone(), - peers: Vec::new(), - backoff: None, - }; + match self.peer_protocols.get(peer) { + Some(PeerKind::Floodsub) => { + error!("Attempted to prune a Floodsub peer"); + } + Some(PeerKind::Gossipsub) => { + // GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway + return GossipsubControlAction::Prune { + topic_hash: topic_hash.clone(), + peers: Vec::new(), + backoff: None, + }; + } + None => { + error!("Attempted to Prune an unknown peer"); + } + _ => {} // Gossipsub 1.1 peer perform the `Prune` } - //select peers for peer exchange + // Select peers for peer exchange let peers = if do_px { Self::get_random_peers( &self.topic_peers, @@ -915,7 +929,7 @@ impl Gossipsub { Vec::new() }; - //update backoff + // update backoff self.backoffs .update_backoff(topic_hash, peer, self.config.prune_backoff()); @@ -930,7 +944,7 @@ impl Gossipsub { fn leave(&mut self, topic_hash: &TopicHash) { debug!("Running LEAVE for topic {:?}", topic_hash); - // if our mesh contains the topic, send prune to peers and delete it from the mesh + // If our mesh contains the topic, send prune to peers and delete it from the mesh if let Some((_, peers)) = self.mesh.remove_entry(topic_hash) { for peer in peers { // Send a PRUNE control message @@ -942,10 +956,10 @@ impl Gossipsub { debug!("Completed LEAVE for topic: {:?}", topic_hash); } - ///Checks if the given peer is still connected and if not dials the peer again + /// Checks if the given peer is still connected and if not dials the peer again. fn check_explicit_peer_connection(&mut self, peer_id: &PeerId) { if !self.peer_topics.contains_key(peer_id) { - //connect to peer + // Connect to peer debug!("Connecting to explicit peer {:?}", peer_id); self.events.push_back(NetworkBehaviourAction::DialPeer { peer_id: peer_id.clone(), @@ -954,6 +968,8 @@ impl Gossipsub { } } + /// Determines if a peer's score is below a given `PeerScoreThreshold` chosen via the + /// `threshold` parameter. fn score_below_threshold( &self, peer_id: &PeerId, @@ -973,8 +989,8 @@ impl Gossipsub { /// Handles an IHAVE control message. Checks our cache of messages. If the message is unknown, /// requests it with an IWANT control message. fn handle_ihave(&mut self, peer_id: &PeerId, ihave_msgs: Vec<(TopicHash, Vec)>) { - // we ignore IHAVE gossip from any peer whose score is below the gossip threshold - if let (true, score) = self.score_below_threshold(peer_id, |ts| ts.gossip_threshold) { + // We ignore IHAVE gossip from any peer whose score is below the gossip threshold + if let (true, score) = self.score_below_threshold(peer_id, |pst| pst.gossip_threshold) { debug!( "IHAVE: ignoring peer {:?} with score below threshold [score = {}]", peer_id, score @@ -982,7 +998,7 @@ impl Gossipsub { return; } - //IHAVE flood protection + // IHAVE flood protection let peer_have = self.count_peer_have.entry(peer_id.clone()).or_insert(0); *peer_have += 1; if *peer_have > self.config.max_ihave_messages() { @@ -1031,7 +1047,7 @@ impl Gossipsub { let iasked = self.count_iasked.entry(peer_id.clone()).or_insert(0); let mut iask = iwant_ids.len(); if *iasked + iask > self.config.max_ihave_length() { - iask = self.config.max_ihave_length() - *iasked; + iask = self.config.max_ihave_length().saturating_sub(*iasked); } // Send the list of IWANT control messages @@ -1050,7 +1066,7 @@ impl Gossipsub { iwant_ids_vec.truncate(iask as usize); *iasked += iask; - let message_ids = iwant_ids_vec.into_iter().cloned().collect(); + let message_ids = iwant_ids_vec.into_iter().cloned().collect::>(); if let Some((_, _, _, gossip_promises)) = &mut self.peer_score { gossip_promises.add_promise( peer_id.clone(), @@ -1071,8 +1087,8 @@ impl Gossipsub { /// Handles an IWANT control message. Checks our cache of messages. If the message exists it is /// forwarded to the requesting peer. fn handle_iwant(&mut self, peer_id: &PeerId, iwant_msgs: Vec) { - // we ignore IWANT gossip from any peer whose score is below the gossip threshold - if let (true, score) = self.score_below_threshold(peer_id, |ts| ts.gossip_threshold) { + // We ignore IWANT gossip from any peer whose score is below the gossip threshold + if let (true, score) = self.score_below_threshold(peer_id, |pst| pst.gossip_threshold) { debug!( "IWANT: ignoring peer {:?} with score below threshold [score = {}]", peer_id, score @@ -1085,7 +1101,7 @@ impl Gossipsub { let mut cached_messages = HashMap::new(); for id in iwant_msgs { - // if we have it and the ihave count is not above the threshold, add it do the + // If we have it and the IHAVE count is not above the threshold, add it do the // cached_messages mapping if let Some((msg, count)) = self.mcache.get_with_iwant_counts(&id, peer_id) { if count > self.config.gossip_retransimission() { @@ -1113,13 +1129,13 @@ impl Gossipsub { }, ); } - debug!("Completed IWANT handling for peer: {:?}", peer_id); + debug!("Completed IWANT handling for peer: {}", peer_id); } /// Handles GRAFT control messages. If subscribed to the topic, adds the peer to mesh, if not, /// responds with PRUNE messages. fn handle_graft(&mut self, peer_id: &PeerId, topics: Vec) { - debug!("Handling GRAFT message for peer: {:?}", peer_id); + debug!("Handling GRAFT message for peer: {}", peer_id); let mut to_prune_topics = HashSet::new(); @@ -1127,7 +1143,7 @@ impl Gossipsub { // we don't GRAFT to/from explicit peers; complain loudly if this happens if self.explicit_peers.contains(peer_id) { - warn!("GRAFT: ignoring request from direct peer {:?}", peer_id); + warn!("GRAFT: ignoring request from direct peer {}", peer_id); // this is possibly a bug from non-reciprocal configuration; send a PRUNE for all topics to_prune_topics = HashSet::from_iter(topics.into_iter()); // but don't PX @@ -1137,7 +1153,7 @@ impl Gossipsub { let now = Instant::now(); for topic_hash in topics { if let Some(peers) = self.mesh.get_mut(&topic_hash) { - //if the peer is already in the mesh ignore the graft + // if the peer is already in the mesh ignore the graft if peers.contains(peer_id) { continue; } @@ -1146,14 +1162,17 @@ impl Gossipsub { if let Some(backoff_time) = self.backoffs.get_backoff_time(&topic_hash, peer_id) { if backoff_time > now { - debug!("GRAFT: ignoring backed off peer {:?}", peer_id); - //add behavioural penalty + warn!( + "GRAFT: peer attempted graft within backoff time, penalizing {}", + peer_id + ); + // add behavioural penalty if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.add_penalty(peer_id, 1); // check the flood cutoff - let flood_cutoff = backoff_time - + self.config.graft_flood_threshold() + let flood_cutoff = (backoff_time + + self.config.graft_flood_threshold()) - self.config.prune_backoff(); if flood_cutoff > now { //extra penalty @@ -1183,8 +1202,8 @@ impl Gossipsub { continue; } - //check mesh upper bound and only allow graft if upperbound not reached or - //if it is an outbound peer + // check mesh upper bound and only allow graft if the upper bound is not reached or + // if it is an outbound peer if peers.len() >= self.config.mesh_n_high() && !self.outbound_peers.contains(peer_id) { @@ -1219,7 +1238,7 @@ impl Gossipsub { .collect(); // Send the prune messages to the peer info!( - "GRAFT: Not subscribed to topics - Sending PRUNE to peer: {:?}", + "GRAFT: Not subscribed to topics - Sending PRUNE to peer: {}", peer_id ); self.send_message( @@ -1231,7 +1250,7 @@ impl Gossipsub { }, ); } - debug!("Completed GRAFT handling for peer: {:?}", peer_id); + debug!("Completed GRAFT handling for peer: {}", peer_id); } /// Handles PRUNE control messages. Removes peer from the mesh. @@ -1240,15 +1259,15 @@ impl Gossipsub { peer_id: &PeerId, prune_data: Vec<(TopicHash, Vec, Option)>, ) { - debug!("Handling PRUNE message for peer: {}", peer_id.to_string()); + debug!("Handling PRUNE message for peer: {}", peer_id); let (below_threshold, score) = - self.score_below_threshold(peer_id, |ts| ts.accept_px_threshold); + self.score_below_threshold(peer_id, |pst| pst.accept_px_threshold); for (topic_hash, px, backoff) in prune_data { if let Some(peers) = self.mesh.get_mut(&topic_hash) { // remove the peer if it exists in the mesh if peers.remove(peer_id) { info!( - "PRUNE: Removing peer: {} from the mesh for topic: {:?}", + "PRUNE: Removing peer: {} from the mesh for topic: {}", peer_id.to_string(), topic_hash ); @@ -1281,7 +1300,15 @@ impl Gossipsub { continue; } - self.px_connect(px); + // NOTE: We cannot dial any peers from PX currently as we typically will not + // know their multiaddr. Until SignedRecords are spec'd this + // remains a stub. By default `config.prune_peers()` is set to zero and + // this is skipped. If the user modifies this, this will only be able to + // dial already known peers (from an external discovery mechanism for + // example). + if self.config.prune_peers() > 0 { + self.px_connect(px); + } } } } @@ -1290,24 +1317,24 @@ impl Gossipsub { fn px_connect(&mut self, mut px: Vec) { let n = self.config.prune_peers(); - //ignore peerInfo with no ID - //TODO can we use peerInfo without any IDs if they have a signed peer record? + // Ignore peerInfo with no ID + //TODO: Once signed records are spec'd: Can we use peerInfo without any IDs if they have a signed peer record? px = px.into_iter().filter(|p| p.peer_id.is_some()).collect(); if px.len() > n { - //only use at most prune_peers many random peers + // only use at most prune_peers many random peers let mut rng = thread_rng(); px.partial_shuffle(&mut rng, n); px = px.into_iter().take(n).collect(); } for p in px { - //TODO extract signed peer record if given and handle it, see + // TODO: Once signed records are spec'd: extract signed peer record if given and handle it, see // https://github.com/libp2p/specs/pull/217 if let Some(peer_id) = p.peer_id { - //mark as px peer + // mark as px peer self.px_peers.insert(peer_id.clone()); - //dial peer + // dial peer self.events.push_back(NetworkBehaviourAction::DialPeer { peer_id, condition: DialPeerCondition::Disconnected, @@ -1335,15 +1362,12 @@ impl Gossipsub { // reject messages claiming to be from ourselves but not locally published if let Some(own_id) = self.publish_config.get_own_id() { - //TODO remove this "hack" as soon as lighthouse uses Anonymous instead of this fixed - // PeerId. - let lighthouse_anonymous_id = PeerId::from_bytes(vec![0, 1, 0]).expect("Valid peer id"); - if own_id != &lighthouse_anonymous_id + if !self.config.allow_self_origin() && own_id != propagation_source && msg.source.as_ref().map_or(false, |s| s == own_id) { debug!( - "Dropping message {:?} claiming to be from self but forwarded from {:?}", + "Dropping message {} claiming to be from self but forwarded from {}", msg_id, propagation_source ); if let Some((peer_score, _, _, gossip_promises)) = &mut self.peer_score { @@ -1356,23 +1380,24 @@ impl Gossipsub { // Add the message to the duplication cache and memcache. if !self.duplication_cache.insert(msg_id.clone()) { - debug!("Message already received, ignoring. Message: {:?}", msg_id); + debug!("Message already received, ignoring. Message: {}", msg_id); if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.duplicated_message(propagation_source, &msg); } return; } - //tells score that message arrived (but is maybe not fully validated yet) - //Consider message as delivered for gossip promises + // Tells score that message arrived (but is maybe not fully validated yet) + // Consider message as delivered for gossip promises if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { peer_score.validate_message(propagation_source, &msg); gossip_promises.deliver_message(&msg_id); } + // Add the message to our memcache self.mcache.put(msg.clone()); - // dispatch the message to the user + // Dispatch the message to the user if we are subscribed to any of the topics if self.mesh.keys().any(|t| msg.topics.iter().any(|u| t == u)) { debug!("Sending received message to user"); self.events.push_back(NetworkBehaviourAction::GenerateEvent( @@ -1382,6 +1407,12 @@ impl Gossipsub { message: msg.clone(), }, )); + } else { + debug!( + "Received message on a topic we are not subscribed to. Topics {:?}", + msg.topics.iter().collect::>() + ); + return; } // forward the message to mesh peers, if no validation is required @@ -1421,7 +1452,7 @@ impl Gossipsub { let mut application_event = Vec::new(); for subscription in subscriptions { - // get the peers from the mapping, or insert empty lists if topic doesn't exist + // get the peers from the mapping, or insert empty lists if the topic doesn't exist let peer_list = self .topic_peers .entry(subscription.topic_hash.clone()) @@ -1534,6 +1565,7 @@ impl Gossipsub { ); } + /// Applies penalties to peers that did not respond to our IWANT requests. fn apply_iwant_penalties(&mut self) { if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { for (peer, count) in gossip_promises.get_broken_promises() { @@ -1552,24 +1584,24 @@ impl Gossipsub { let mut to_prune = HashMap::new(); let mut no_px = HashSet::new(); - //clean up expired backoffs + // clean up expired backoffs self.backoffs.heartbeat(); // clean up ihave counters self.count_iasked.clear(); self.count_peer_have.clear(); - //apply iwant penalties + // apply iwant penalties self.apply_iwant_penalties(); - //check connections to explicit peers + // check connections to explicit peers if self.heartbeat_ticks % self.config.check_explicit_peers_ticks() == 0 { for p in self.explicit_peers.clone() { self.check_explicit_peer_connection(&p); } } - //cache scores throughout the heartbeat + // cache scores throughout the heartbeat let mut scores = HashMap::new(); let peer_score = &self.peer_score; let mut score = |p: &PeerId| match peer_score { @@ -1662,10 +1694,10 @@ impl Gossipsub { shuffled.shuffle(&mut rng); shuffled .sort_by(|p1, p2| score(p1).partial_cmp(&score(p2)).unwrap_or(Ordering::Equal)); - //shuffle everything except the last retain_scores many peers (the best ones) + // shuffle everything except the last retain_scores many peers (the best ones) shuffled[..peers.len() - self.config.retain_scores()].shuffle(&mut rng); - //count total number of outbound peers + // count total number of outbound peers let mut outbound = { let outbound_peers = &self.outbound_peers; shuffled @@ -1701,10 +1733,10 @@ impl Gossipsub { // do we have enough outbound peers? if peers.len() >= self.config.mesh_n_low() { - //count number of outbound peers we have + // count number of outbound peers we have let outbound = { peers.iter().filter(|p| outbound_peers.contains(*p)).count() }; - //if we have not enough outbound peers, graft to some new outbound peers + // if we have not enough outbound peers, graft to some new outbound peers if outbound < self.config.mesh_outbound_min() { let needed = self.config.mesh_outbound_min() - outbound; let peer_list = Self::get_random_peers( @@ -1928,7 +1960,7 @@ impl Gossipsub { let mut peer_message_ids = message_ids.clone(); if peer_message_ids.len() > self.config.max_ihave_length() { - // we do this per peer so that we emit a different set for each peer. + // We do this per peer so that we emit a different set for each peer. // we have enough redundancy in the system that this will significantly increase // the message coverage when we do truncate. peer_message_ids.partial_shuffle(&mut rng, self.config.max_ihave_length()); @@ -2022,12 +2054,16 @@ impl Gossipsub { /// Helper function which forwards a message to mesh\[topic\] peers. /// Returns true if at least one peer was messaged. - fn forward_msg(&mut self, message: GossipsubMessage, source: Option<&PeerId>) -> bool { + fn forward_msg( + &mut self, + message: GossipsubMessage, + propagation_source: Option<&PeerId>, + ) -> bool { let msg_id = (self.config.message_id_fn())(&message); - //message is fully validated inform peer_score - if let Some((peer_score, _, _, gossip_promises)) = &mut self.peer_score { - if let Some(peer) = source { + // message is fully validated inform peer_score + if let Some((peer_score, ..)) = &mut self.peer_score { + if let Some(peer) = propagation_source { peer_score.deliver_message(peer, &message); } } @@ -2040,17 +2076,22 @@ impl Gossipsub { // mesh if let Some(mesh_peers) = self.mesh.get(&topic) { for peer_id in mesh_peers { - if Some(peer_id) != source { + if Some(peer_id) != propagation_source + && Some(peer_id) != message.source.as_ref() + { recipient_peers.insert(peer_id.clone()); } } } } - //add explicit peers + // Add explicit peers for p in &self.explicit_peers { if let Some(topics) = self.peer_topics.get(p) { - if Some(p) != source && message.topics.iter().any(|t| topics.contains(t)) { + if Some(p) != propagation_source + && Some(p) != message.source.as_ref() + && message.topics.iter().any(|t| topics.contains(t)) + { recipient_peers.insert(p.clone()); } } @@ -2294,8 +2335,8 @@ impl NetworkBehaviour for Gossipsub { Vec::new() } - fn inject_connected(&mut self, id: &PeerId) { - info!("New peer connected: {:?}", id); + fn inject_connected(&mut self, peer_id: &PeerId) { + info!("New peer connected: {}", peer_id); // We need to send our subscriptions to the newly-connected node. let mut subscriptions = vec![]; for topic_hash in self.mesh.keys() { @@ -2308,7 +2349,7 @@ impl NetworkBehaviour for Gossipsub { if !subscriptions.is_empty() { // send our subscriptions to the peer self.send_message( - id.clone(), + peer_id.clone(), GossipsubRpc { messages: Vec::new(), subscriptions, @@ -2317,24 +2358,28 @@ impl NetworkBehaviour for Gossipsub { ); } - // For the time being assume all gossipsub peers - self.peer_topics.insert(id.clone(), Default::default()); + // Insert an empty set of the topics of this peer until known. + self.peer_topics.insert(peer_id.clone(), Default::default()); - // By default we assume a peer is only a floodsub peer + // By default we assume a peer is only a floodsub peer. + // + // The protocol negotiation occurs once a message is sent/received. Once this happens we + // update the type of peer that this is in order to determine which kind of routing should + // occur. self.peer_protocols - .entry(id.clone()) + .entry(peer_id.clone()) .or_insert(PeerKind::Floodsub); if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.add_peer(id.clone()); + peer_score.add_peer(peer_id.clone()); } } - fn inject_disconnected(&mut self, id: &PeerId) { - // remove from mesh, topic_peers, peer_topic and fanout - debug!("Peer disconnected: {:?}", id); + fn inject_disconnected(&mut self, peer_id: &PeerId) { + // remove from mesh, topic_peers, peer_topic and the fanout + debug!("Peer disconnected: {}", peer_id); { - let topics = match self.peer_topics.get(id) { + let topics = match self.peer_topics.get(peer_id) { Some(topics) => (topics), None => { warn!("Disconnected node, not in connected nodes"); @@ -2347,39 +2392,44 @@ impl NetworkBehaviour for Gossipsub { // check the mesh for the topic if let Some(mesh_peers) = self.mesh.get_mut(&topic) { // check if the peer is in the mesh and remove it - mesh_peers.remove(id); + mesh_peers.remove(peer_id); } // remove from topic_peers if let Some(peer_list) = self.topic_peers.get_mut(&topic) { - if !peer_list.remove(id) { + if !peer_list.remove(peer_id) { // debugging purposes - warn!("Disconnected node: {:?} not in topic_peers peer list", &id); + warn!( + "Disconnected node: {} not in topic_peers peer list", + peer_id + ); } } else { warn!( - "Disconnected node: {:?} with topic: {:?} not in topic_peers", - &id, &topic + "Disconnected node: {} with topic: {:?} not in topic_peers", + &peer_id, &topic ); } // remove from fanout - self.fanout.get_mut(&topic).map(|peers| peers.remove(id)); + self.fanout + .get_mut(&topic) + .map(|peers| peers.remove(peer_id)); } //forget px and outbound status for this peer - self.px_peers.remove(id); - self.outbound_peers.remove(id); + self.px_peers.remove(peer_id); + self.outbound_peers.remove(peer_id); } - // remove peer from peer_topics and peer_protocols - let was_in = self.peer_topics.remove(id); - debug_assert!(was_in.is_some()); - let was_in = self.peer_protocols.remove(id); - debug_assert!(was_in.is_some()); + // Remove peer from peer_topics and peer_protocols + // NOTE: It is possible the peer has already been removed from all mappings if it does not + // support the protocol. + self.peer_topics.remove(peer_id); + self.peer_protocols.remove(peer_id); if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.remove_peer(id); + peer_score.remove_peer(peer_id); } } @@ -2389,21 +2439,21 @@ impl NetworkBehaviour for Gossipsub { _: &ConnectionId, endpoint: &ConnectedPoint, ) { - //check if the peer is an outbound peer + // Check if the peer is an outbound peer if let ConnectedPoint::Dialer { .. } = endpoint { - //Diverging from the go implementation we only want to consider a peer as outbound peer - //if its first connection is outbound. To check if this connection is the first we - //check if the peer isn't connected yet. This only works because the - //`inject_connection_established` event for the first connection gets called immediately - //before `inject_connected` gets called. + // Diverging from the go implementation we only want to consider a peer as outbound peer + // if its first connection is outbound. To check if this connection is the first we + // check if the peer isn't connected yet. This only works because the + // `inject_connection_established` event for the first connection gets called immediately + // before `inject_connected` gets called. if !self.peer_topics.contains_key(peer) && !self.px_peers.contains(peer) { - //the first connection is outbound and it is not a peer from peer exchange => mark - //it as outbound peer + // The first connection is outbound and it is not a peer from peer exchange => mark + // it as outbound peer self.outbound_peers.insert(peer.clone()); } } - //add ip to peer scoring system + // Add the IP to the peer scoring system if let Some((peer_score, ..)) = &mut self.peer_score { if let Some(ip) = get_ip_addr(get_remote_addr(endpoint)) { peer_score.add_ip(peer.clone(), ip); @@ -2417,7 +2467,7 @@ impl NetworkBehaviour for Gossipsub { _: &ConnectionId, endpoint: &ConnectedPoint, ) { - //remove ip from peer scoring system + // Remove IP from peer scoring system if let Some((peer_score, ..)) = &mut self.peer_score { if let Some(ip) = get_ip_addr(get_remote_addr(endpoint)) { peer_score.remove_ip(peer, &ip); @@ -2432,7 +2482,7 @@ impl NetworkBehaviour for Gossipsub { endpoint_old: &ConnectedPoint, endpoint_new: &ConnectedPoint, ) { - //exchange ip in peer scoring system + // Exchange IP in peer scoring system if let Some((peer_score, ..)) = &mut self.peer_score { if let Some(ip) = get_ip_addr(get_remote_addr(endpoint_old)) { peer_score.remove_ip(peer, &ip); @@ -2451,8 +2501,9 @@ impl NetworkBehaviour for Gossipsub { ) { match handler_event { HandlerEvent::PeerKind(kind) => { + // We have identified the protocol this peer is using if let PeerKind::NotSupported = kind { - //we treat this peer as disconnected + // We treat this peer as disconnected self.inject_disconnected(&propagation_source); } else if let Some(old_kind) = self.peer_protocols.get_mut(&propagation_source) { // Only change the value if the old value is Floodsub (the default set in @@ -2468,20 +2519,22 @@ impl NetworkBehaviour for Gossipsub { } => { // Handle any invalid messages from this peer if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { - let mut id_fn = self.config.message_id_fn(); + let id_fn = self.config.message_id_fn(); for (_message, validation_error) in invalid_messages { + warn!("Message rejected. Reason: {:?}", validation_error); let reason = RejectReason::ProtocolValidationError(validation_error); peer_score.reject_message(&propagation_source, &_message, reason); gossip_promises.reject_message(&id_fn(&_message), &reason); } } - // Handle the gossipsub RPC + // Handle the Gossipsub RPC // Check if peer is graylisted in which case we ignore the event if let (true, _) = - self.score_below_threshold(&propagation_source, |ts| ts.graylist_threshold) + self.score_below_threshold(&propagation_source, |pst| pst.graylist_threshold) { + debug!("RPC Dropped from greylisted peer {}", propagation_source); return; } @@ -2573,7 +2626,7 @@ impl NetworkBehaviour for Gossipsub { }); } - //update scores + // update scores if let Some((peer_score, _, interval, _)) = &mut self.peer_score { while let Poll::Ready(Some(())) = interval.poll_next_unpin(cx) { peer_score.refresh_scores(); @@ -2590,20 +2643,23 @@ impl NetworkBehaviour for Gossipsub { /// Validates the combination of signing, privacy and message validation to ensure the /// configuration will not reject published messages. -fn validate_config(authenticity: &MessageAuthenticity, validation_mode: &ValidationMode) { +fn validate_config( + authenticity: &MessageAuthenticity, + validation_mode: &ValidationMode, +) -> Result<(), &'static str> { match validation_mode { ValidationMode::Anonymous => { if authenticity.is_signing() { - panic!("Cannot enable message signing with an Anonymous validation mode. Consider changing either the ValidationMode or MessageAuthenticity"); + return Err("Cannot enable message signing with an Anonymous validation mode. Consider changing either the ValidationMode or MessageAuthenticity"); } if !authenticity.is_anonymous() { - panic!("Published messages contain an author but incoming messages with an author will be rejected. Consider adjusting the validation or privacy settings in the config"); + return Err("Published messages contain an author but incoming messages with an author will be rejected. Consider adjusting the validation or privacy settings in the config"); } } ValidationMode::Strict => { if !authenticity.is_signing() { - panic!( + return Err( "Messages will be published unsigned and incoming unsigned messages will be rejected. Consider adjusting the validation or privacy settings in the config" @@ -2612,6 +2668,7 @@ fn validate_config(authenticity: &MessageAuthenticity, validation_mode: &Validat } _ => {} } + Ok(()) } impl fmt::Debug for Gossipsub { diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 1d5efc7ab53..200fc468e2d 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -108,7 +108,8 @@ mod tests { ) -> (Gossipsub, Vec, Vec) { let keypair = libp2p_core::identity::Keypair::generate_secp256k1(); // create a gossipsub struct - let mut gs: Gossipsub = Gossipsub::new(MessageAuthenticity::Signed(keypair), gs_config); + let mut gs: Gossipsub = + Gossipsub::new(MessageAuthenticity::Signed(keypair), gs_config).unwrap(); if let Some((scoring_params, scoring_thresholds)) = scoring { gs.with_peer_score(scoring_params, scoring_thresholds) @@ -709,7 +710,7 @@ mod tests { .build() .unwrap(); // create a gossipsub struct - let mut gs: Gossipsub = Gossipsub::new(MessageAuthenticity::Anonymous, gs_config); + let mut gs: Gossipsub = Gossipsub::new(MessageAuthenticity::Anonymous, gs_config).unwrap(); // create a topic and fill it with some peers let topic_hash = Topic::new("Test").hash().clone(); @@ -2118,9 +2119,13 @@ mod tests { #[test] fn test_only_send_nonnegative_scoring_peers_in_px() { - let config = GossipsubConfig::default(); + let config = GossipsubConfigBuilder::new() + .prune_peers(16) + .do_px() + .build() + .unwrap(); - //build mesh with three peer + // Build mesh with three peer let (mut gs, peers, topics) = build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( 3, @@ -2132,10 +2137,10 @@ mod tests { Some((PeerScoreParams::default(), PeerScoreThresholds::default())), ); - //penalize first peer + // Penalize first peer gs.peer_score.as_mut().unwrap().0.add_penalty(&peers[0], 1); - //prune second peer + // Prune second peer gs.send_graft_prune( HashMap::new(), vec![(peers[1].clone(), vec![topics[0].clone()])] @@ -2144,7 +2149,7 @@ mod tests { HashSet::new(), ); - //check that px in prune message only contains third peer + // Check that px in prune message only contains third peer assert_eq!( count_control_msgs(&gs, |peer_id, m| peer_id == &peers[1] && match m { @@ -2169,7 +2174,7 @@ mod tests { let mut peer_score_thresholds = PeerScoreThresholds::default(); peer_score_thresholds.gossip_threshold = 3.0 * peer_score_params.behaviour_penalty_weight; - //build full mesh + // Build full mesh let (mut gs, peers, topics) = build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( config.mesh_n_high(), @@ -2181,24 +2186,24 @@ mod tests { Some((peer_score_params, peer_score_thresholds)), ); - // graft all the peer + // Graft all the peer for peer in peers { gs.handle_graft(&peer, topics.clone()); } - //add two additional peers that will not be part of the mesh + // Add two additional peers that will not be part of the mesh let p1 = add_peer(&mut gs, &topics, false, false); let p2 = add_peer(&mut gs, &topics, false, false); - //reduce score of p1 below peer_score_thresholds.gossip_threshold - //note that penalties get squared so two penalties means a score of + // Reduce score of p1 below peer_score_thresholds.gossip_threshold + // note that penalties get squared so two penalties means a score of // 4 * peer_score_params.behaviour_penalty_weight. gs.peer_score.as_mut().unwrap().0.add_penalty(&p1, 2); - //reduce score of p2 below 0 but not below peer_score_thresholds.gossip_threshold + // Reduce score of p2 below 0 but not below peer_score_thresholds.gossip_threshold gs.peer_score.as_mut().unwrap().0.add_penalty(&p2, 1); - //receive message + // Receive message let message = GossipsubMessage { source: Some(PeerId::random()), data: vec![], @@ -2210,11 +2215,11 @@ mod tests { }; gs.handle_received_message(message.clone(), &PeerId::random()); - //emit gossip + // Emit gossip gs.emit_gossip(); let msg_id = (gs.config.message_id_fn())(&message); - //check that exactly one gossip messages got sent and it got sent to p2 + // Check that exactly one gossip messages got sent and it got sent to p2 assert_eq!( count_control_msgs(&gs, |peer, action| match action { GossipsubControlAction::IHave { @@ -2241,7 +2246,7 @@ mod tests { let mut peer_score_thresholds = PeerScoreThresholds::default(); peer_score_thresholds.gossip_threshold = 3.0 * peer_score_params.behaviour_penalty_weight; - //build full mesh + // Build full mesh let (mut gs, peers, topics) = build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( config.mesh_n_high(), @@ -2253,24 +2258,24 @@ mod tests { Some((peer_score_params, peer_score_thresholds)), ); - // graft all the peer + // Graft all the peer for peer in peers { gs.handle_graft(&peer, topics.clone()); } - //add two additional peers that will not be part of the mesh + // Add two additional peers that will not be part of the mesh let p1 = add_peer(&mut gs, &topics, false, false); let p2 = add_peer(&mut gs, &topics, false, false); - //reduce score of p1 below peer_score_thresholds.gossip_threshold - //note that penalties get squared so two penalties means a score of + // Reduce score of p1 below peer_score_thresholds.gossip_threshold + // note that penalties get squared so two penalties means a score of // 4 * peer_score_params.behaviour_penalty_weight. gs.peer_score.as_mut().unwrap().0.add_penalty(&p1, 2); - //reduce score of p2 below 0 but not below peer_score_thresholds.gossip_threshold + // Reduce score of p2 below 0 but not below peer_score_thresholds.gossip_threshold gs.peer_score.as_mut().unwrap().0.add_penalty(&p2, 1); - //receive message + // Rreceive message let message = GossipsubMessage { source: Some(PeerId::random()), data: vec![], @@ -2634,12 +2639,15 @@ mod tests { #[test] fn test_ignore_px_from_peers_below_accept_px_threshold() { - let config = GossipsubConfig::default(); + let config = GossipsubConfigBuilder::new() + .prune_peers(16) + .build() + .unwrap(); let peer_score_params = PeerScoreParams::default(); let mut peer_score_thresholds = PeerScoreThresholds::default(); peer_score_thresholds.accept_px_threshold = peer_score_params.app_specific_weight; - //build mesh with two peer + // Build mesh with two peers let (mut gs, peers, topics) = build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( 2, @@ -2651,13 +2659,13 @@ mod tests { Some((peer_score_params, peer_score_thresholds)), ); - //increase score of first peer to less than accept_px_threshold + // Decrease score of first peer to less than accept_px_threshold gs.set_application_score(&peers[0], 0.99); - //increase score of second peer to accept_px_threshold + // Increase score of second peer to accept_px_threshold gs.set_application_score(&peers[1], 1.0); - //handle prune from peer peers[0] with px peers + // Handle prune from peer peers[0] with px peers let px = vec![PeerInfo { peer_id: Some(PeerId::random()), }]; @@ -2670,7 +2678,7 @@ mod tests { )], ); - //assert no dials + // Assert no dials assert_eq!( gs.events .iter() @@ -2745,14 +2753,9 @@ mod tests { assert_eq!(gs.mesh[&topics[0]].len(), n); - dbg!(&peers); - dbg!(&gs.mesh[&topics[0]]); - //heartbeat to prune some peers gs.heartbeat(); - dbg!(&gs.mesh[&topics[0]]); - assert_eq!(gs.mesh[&topics[0]].len(), config.mesh_n()); //mesh contains retain_scores best peers diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index 462fc59e6f8..25d0e4de636 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -132,8 +132,13 @@ pub struct GossipsubConfig { /// the message id. message_id_fn: fn(&GossipsubMessage) -> MessageId, + /// By default, gossipsub will reject messages that are sent to us that has the same message + /// source as we have specified locally. Enabling this, allows these messages and prevents + /// penalizing the peer that sent us the message. Default is false. + allow_self_origin: bool, + /// Whether Peer eXchange is enabled; this should be enabled in bootstrappers and other well - /// connected/trusted nodes. The default is true. + /// connected/trusted nodes. The default is false. do_px: bool, /// Controls the number of peers to include in prune Peer eXchange. @@ -197,7 +202,7 @@ pub struct GossipsubConfig { max_ihave_length: usize, /// GossipSubMaxIHaveMessages is the maximum number of IHAVE messages to accept from a peer - /// within a heartbeat. + /// within a heartbeat. The default is 10. max_ihave_messages: usize, /// Time to wait for a message requested through IWANT following an IHAVE advertisement. @@ -209,7 +214,6 @@ pub struct GossipsubConfig { support_floodsub: bool, } -//TODO use a macro for getters + the builder impl GossipsubConfig { //all the getters @@ -331,6 +335,13 @@ impl GossipsubConfig { self.message_id_fn } + /// By default, gossipsub will reject messages that are sent to us that has the same message + /// source as we have specified locally. Enabling this, allows these messages and prevents + /// penalizing the peer that sent us the message. Default is false. + pub fn allow_self_origin(&self) -> bool { + self.allow_self_origin + } + /// Whether Peer eXchange is enabled; this should be enabled in bootstrappers and other well /// connected/trusted nodes. The default is true. pub fn do_px(&self) -> bool { @@ -341,7 +352,8 @@ impl GossipsubConfig { /// When we prune a peer that's eligible for PX (has a good score, etc), we will try to /// send them signed peer records for up to `prune_peers` other peers that we /// know of. It is recommended that this value is larger than `mesh_n_high` so that the pruned - /// peer can reliably form a full mesh. The default is 16. + /// peer can reliably form a full mesh. The default is typically 16 however until signed + /// records are spec'd this is disabled and set to 0. pub fn prune_peers(&self) -> usize { self.prune_peers } @@ -499,8 +511,9 @@ impl GossipsubConfigBuilder { .push_str(&message.sequence_number.unwrap_or_default().to_string()); MessageId::from(source_string) }, - do_px: true, - prune_peers: 16, + allow_self_origin: false, + do_px: false, + prune_peers: 0, // NOTE: Increasing this currently has little effect until Signed records are implemented. prune_backoff: Duration::from_secs(60), backoff_slack: 1, flood_publish: true, @@ -646,10 +659,10 @@ impl GossipsubConfigBuilder { self } - /// Whether Peer eXchange is enabled; this should be enabled in bootstrappers and other well + /// Enables Peer eXchange. This should be enabled in bootstrappers and other well /// connected/trusted nodes. The default is true. - pub fn do_px(&mut self, do_px: bool) -> &mut Self { - self.config.do_px = do_px; + pub fn do_px(&mut self) -> &mut Self { + self.config.do_px = true; self } @@ -805,6 +818,7 @@ impl std::fmt::Debug for GossipsubConfig { let _ = builder.field("duplicate_cache_time", &self.duplicate_cache_time); let _ = builder.field("validate_messages", &self.validate_messages); let _ = builder.field("validation_mode", &self.validation_mode); + let _ = builder.field("allow_self_origin", &self.allow_self_origin); let _ = builder.field("do_px", &self.do_px); let _ = builder.field("prune_peers", &self.prune_peers); let _ = builder.field("prune_backoff", &self.prune_backoff); diff --git a/protocols/gossipsub/src/gossip_promises.rs b/protocols/gossipsub/src/gossip_promises.rs index 4419c04f4f2..c2a5d7787aa 100644 --- a/protocols/gossipsub/src/gossip_promises.rs +++ b/protocols/gossipsub/src/gossip_promises.rs @@ -1,3 +1,23 @@ +// Copyright 2020 Sigma Prime Pty Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + use crate::error::ValidationError; use crate::peer_score::RejectReason; use crate::MessageId; @@ -8,23 +28,23 @@ use rand::thread_rng; use std::collections::HashMap; use wasm_timer::Instant; -///struct that tracks recently sent iwant messages and checks if peers respond to them -///for each iwant message we track one random requested message id +/// Tracks recently sent `IWANT` messages and checks if peers respond to them +/// for each `IWANT` message we track one random requested message id. #[derive(Default)] pub(crate) struct GossipPromises { - // stores for each tracked message id and peer the instant when this promise expires - // if the peer didn't respond until then we consider the promise as broken and penalize the + // Stores for each tracked message id and peer the instant when this promise expires. + // If the peer didn't respond until then we consider the promise as broken and penalize the // peer. promises: HashMap>, } impl GossipPromises { /// Track a promise to deliver a message from a list of msgIDs we are requesting. - pub fn add_promise(&mut self, peer: PeerId, messages: &Vec, expires: Instant) { - //randomly select a message id + pub fn add_promise(&mut self, peer: PeerId, messages: &[MessageId], expires: Instant) { + // Randomly select a message id let mut rng = thread_rng(); if let Some(message_id) = messages.choose(&mut rng) { - //if a promise for this message id and peer already exists we don't update expires! + // If a promise for this message id and peer already exists we don't update expires! self.promises .entry(message_id.clone()) .or_insert_with(|| HashMap::new()) @@ -34,7 +54,7 @@ impl GossipPromises { } pub fn deliver_message(&mut self, message_id: &MessageId) { - //someone delivered a message, we can stop tracking all promises for it + // Someone delivered a message, we can stop tracking all promises for it self.promises.remove(message_id); } diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index 0b5fab4a6b5..b4262d35f26 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -152,13 +152,13 @@ mod rpc_proto { include!(concat!(env!("OUT_DIR"), "/gossipsub.pb.rs")); } -pub use self::behaviour::{Gossipsub, GossipsubEvent, MessageAcceptance, MessageAuthenticity}; +pub use self::behaviour::{Gossipsub, GossipsubEvent, MessageAuthenticity}; pub use self::config::{GossipsubConfig, GossipsubConfigBuilder, ValidationMode}; pub use self::peer_score::{ score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds, TopicScoreParams, }; pub use self::topic::{Hasher, Topic, TopicHash}; -pub use self::types::{GossipsubMessage, GossipsubRpc, MessageId}; +pub use self::types::{GossipsubMessage, GossipsubRpc, MessageAcceptance, MessageId}; pub type IdentTopic = Topic; pub type Sha256Topic = Topic; diff --git a/protocols/gossipsub/src/peer_score/mod.rs b/protocols/gossipsub/src/peer_score/mod.rs index 3697b2aae75..15f476bc208 100644 --- a/protocols/gossipsub/src/peer_score/mod.rs +++ b/protocols/gossipsub/src/peer_score/mod.rs @@ -1,3 +1,24 @@ +// Copyright 2020 Sigma Prime Pty Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! //! Manages and stores the Scoring logic of a particular peer on the gossipsub behaviour. use crate::time_cache::TimeCache; @@ -631,6 +652,7 @@ impl PeerScore { } } + /// Sets scoring parameters for a topic. pub fn set_topic_params(&mut self, topic_hash: TopicHash, params: TopicScoreParams) { self.params.topics.insert(topic_hash, params); } @@ -644,7 +666,7 @@ impl PeerScore { peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params) { debug!( - "Peer {} delivered an invalid messag in topic {} and gets penalized \ + "Peer {} delivered an invalid message in topic {} and gets penalized \ for it", peer_id, topic_hash ); diff --git a/protocols/gossipsub/src/peer_score/params.rs b/protocols/gossipsub/src/peer_score/params.rs index 856185da946..59194ad3569 100644 --- a/protocols/gossipsub/src/peer_score/params.rs +++ b/protocols/gossipsub/src/peer_score/params.rs @@ -1,3 +1,23 @@ +// Copyright 2020 Sigma Prime Pty Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + use crate::TopicHash; use std::collections::{HashMap, HashSet}; use std::net::IpAddr; @@ -8,66 +28,6 @@ const DEFAULT_DECAY_INTERVAL: u64 = 1; /// The default rate to decay to 0. const DEFAULT_DECAY_TO_ZERO: f64 = 0.1; -// TODO: Adjust these defaults -impl Default for TopicScoreParams { - fn default() -> Self { - TopicScoreParams { - topic_weight: 0.5, - // P1 - time_in_mesh_weight: 1.0, - time_in_mesh_quantum: Duration::from_millis(1), - time_in_mesh_cap: 3600.0, - // P2 - first_message_deliveries_weight: 1.0, - first_message_deliveries_decay: 0.5, - first_message_deliveries_cap: 2000.0, - // P3 - mesh_message_deliveries_weight: -1.0, - mesh_message_deliveries_decay: 0.5, - mesh_message_deliveries_cap: 100.0, - mesh_message_deliveries_threshold: 20.0, - mesh_message_deliveries_window: Duration::from_millis(10), - mesh_message_deliveries_activation: Duration::from_secs(5), - // P3b - mesh_failure_penalty_weight: -1.0, - mesh_failure_penalty_decay: 0.5, - // P4 - invalid_message_deliveries_weight: -1.0, - invalid_message_deliveries_decay: 0.3, - } - } -} - -impl Default for PeerScoreThresholds { - fn default() -> Self { - PeerScoreThresholds { - gossip_threshold: -10.0, - publish_threshold: -50.0, - graylist_threshold: -80.0, - accept_px_threshold: 10.0, - opportunistic_graft_threshold: 20.0, - } - } -} - -impl Default for PeerScoreParams { - fn default() -> Self { - PeerScoreParams { - topics: HashMap::new(), - topic_score_cap: 3600.0, - app_specific_weight: 10.0, - ip_colocation_factor_weight: -5.0, - ip_colocation_factor_threshold: 10.0, - ip_colocation_factor_whitelist: HashSet::new(), - behaviour_penalty_weight: -10.0, - behaviour_penalty_decay: 0.2, - decay_interval: Duration::from_secs(DEFAULT_DECAY_INTERVAL), - decay_to_zero: DEFAULT_DECAY_TO_ZERO, - retain_score: Duration::from_secs(3600), - } - } -} - /// Computes the decay factor for a parameter, assuming the `decay_interval` is 1s /// and that the value decays to zero if it drops below 0.01. pub fn score_parameter_decay(decay: Duration) -> f64 { @@ -109,6 +69,18 @@ pub struct PeerScoreThresholds { pub opportunistic_graft_threshold: f64, } +impl Default for PeerScoreThresholds { + fn default() -> Self { + PeerScoreThresholds { + gossip_threshold: -10.0, + publish_threshold: -50.0, + graylist_threshold: -80.0, + accept_px_threshold: 10.0, + opportunistic_graft_threshold: 20.0, + } + } +} + impl PeerScoreThresholds { pub fn validate(&self) -> Result<(), &'static str> { if self.gossip_threshold > 0f64 { @@ -175,6 +147,24 @@ pub struct PeerScoreParams { pub retain_score: Duration, } +impl Default for PeerScoreParams { + fn default() -> Self { + PeerScoreParams { + topics: HashMap::new(), + topic_score_cap: 3600.0, + app_specific_weight: 10.0, + ip_colocation_factor_weight: -5.0, + ip_colocation_factor_threshold: 10.0, + ip_colocation_factor_whitelist: HashSet::new(), + behaviour_penalty_weight: -10.0, + behaviour_penalty_decay: 0.2, + decay_interval: Duration::from_secs(DEFAULT_DECAY_INTERVAL), + decay_to_zero: DEFAULT_DECAY_TO_ZERO, + retain_score: Duration::from_secs(3600), + } + } +} + /// Peer score parameter validation impl PeerScoreParams { pub fn validate(&self) -> Result<(), String> { @@ -286,6 +276,37 @@ pub struct TopicScoreParams { pub invalid_message_deliveries_decay: f64, } +/// NOTE: The topic score parameters are very network specific. +/// For any production system, these values should be manually set. +impl Default for TopicScoreParams { + fn default() -> Self { + TopicScoreParams { + topic_weight: 0.5, + // P1 + time_in_mesh_weight: 1.0, + time_in_mesh_quantum: Duration::from_millis(1), + time_in_mesh_cap: 3600.0, + // P2 + first_message_deliveries_weight: 1.0, + first_message_deliveries_decay: 0.5, + first_message_deliveries_cap: 2000.0, + // P3 + mesh_message_deliveries_weight: -1.0, + mesh_message_deliveries_decay: 0.5, + mesh_message_deliveries_cap: 100.0, + mesh_message_deliveries_threshold: 20.0, + mesh_message_deliveries_window: Duration::from_millis(10), + mesh_message_deliveries_activation: Duration::from_secs(5), + // P3b + mesh_failure_penalty_weight: -1.0, + mesh_failure_penalty_decay: 0.5, + // P4 + invalid_message_deliveries_weight: -1.0, + invalid_message_deliveries_decay: 0.3, + } + } +} + impl TopicScoreParams { pub fn validate(&self) -> Result<(), &'static str> { // make sure we have a sane topic weight diff --git a/protocols/gossipsub/src/peer_score/tests.rs b/protocols/gossipsub/src/peer_score/tests.rs index e5750b58d3e..b583df26f7b 100644 --- a/protocols/gossipsub/src/peer_score/tests.rs +++ b/protocols/gossipsub/src/peer_score/tests.rs @@ -1,3 +1,23 @@ +// Copyright 2020 Sigma Prime Pty Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + /// A collection of unit tests mostly ported from the go implementation. use super::*; diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index a7876ce52d6..902b9df3edc 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -673,7 +673,8 @@ mod tests { let gs = Gossipsub::new( crate::MessageAuthenticity::Signed(keypair.0.clone()), config, - ); + ) + .unwrap(); let data = (0..g.gen_range(1, 1024)).map(|_| g.gen()).collect(); let topics = Vec::arbitrary(g) .into_iter() diff --git a/protocols/gossipsub/src/time_cache.rs b/protocols/gossipsub/src/time_cache.rs index 8446dffda40..3b2c4ce3e45 100644 --- a/protocols/gossipsub/src/time_cache.rs +++ b/protocols/gossipsub/src/time_cache.rs @@ -1,3 +1,23 @@ +// Copyright 2020 Sigma Prime Pty Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + ///! This implements a time-based LRU cache for checking gossipsub message duplicates. use fnv::FnvHashMap; use std::collections::hash_map::{ diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index 6c53d813a49..ce30400f83e 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -1,8 +1,40 @@ +// Copyright 2020 Sigma Prime Pty Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + //! A collection of types using the Gossipsub system. use crate::TopicHash; use libp2p_core::PeerId; use std::fmt; +#[derive(Debug)] +/// Validation kinds from the application for received messages. +pub enum MessageAcceptance { + /// The message is considered valid, and it should be delivered and forwarded to the network. + Accept, + /// The message is considered invalid, and it should be rejected and trigger the P₄ penalty. + Reject, + /// The message is neither delivered nor forwarded to the network, but the router does not + /// trigger the P₄ penalty. + Ignore, +} + /// A type for gossipsub message ids. #[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct MessageId(pub Vec); diff --git a/protocols/gossipsub/tests/smoke.rs b/protocols/gossipsub/tests/smoke.rs index 86453c7ba7c..44a1863c837 100644 --- a/protocols/gossipsub/tests/smoke.rs +++ b/protocols/gossipsub/tests/smoke.rs @@ -182,7 +182,7 @@ fn build_node() -> (Multiaddr, Swarm) { .validation_mode(ValidationMode::Permissive) .build() .unwrap(); - let behaviour = Gossipsub::new(MessageAuthenticity::Author(peer_id.clone()), config); + let behaviour = Gossipsub::new(MessageAuthenticity::Author(peer_id.clone()), config).unwrap(); let mut swarm = Swarm::new(transport, behaviour, peer_id); let port = 1 + random::();