diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index b18081ad2dd..21bd6f7bcb8 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -18,9 +18,10 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use std::cmp::max; +use std::cmp::{max, Ordering}; use std::collections::hash_map::Entry; use std::iter::FromIterator; +use std::net::IpAddr; use std::time::Duration; use std::{ collections::HashSet, @@ -40,8 +41,8 @@ use rand::{seq::SliceRandom, thread_rng}; use wasm_timer::{Instant, Interval}; use libp2p_core::{ - connection::ConnectionId, identity::error::SigningError, identity::Keypair, ConnectedPoint, - Multiaddr, PeerId, + connection::ConnectionId, identity::error::SigningError, identity::Keypair, + multiaddr::Protocol::Ip4, multiaddr::Protocol::Ip6, ConnectedPoint, Multiaddr, PeerId, }; use libp2p_swarm::{ DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, @@ -52,6 +53,7 @@ use crate::config::{GossipsubConfig, ValidationMode}; use crate::error::PublishError; use crate::handler::GossipsubHandler; use crate::mcache::MessageCache; +use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectMsg}; use crate::protocol::{ GossipsubControlAction, GossipsubMessage, GossipsubSubscription, GossipsubSubscriptionAction, MessageId, PeerInfo, SIGNING_PREFIX, @@ -121,6 +123,16 @@ enum PublishConfig { Anonymous, } +impl PublishConfig { + pub fn get_own_id(&self) -> Option<&PeerId> { + match self { + Self::Signing { author, .. } => Some(&author), + Self::Author(author) => Some(&author), + _ => None, + } + } +} + impl From for PublishConfig { fn from(authenticity: MessageAuthenticity) -> Self { match authenticity { @@ -250,23 +262,18 @@ impl BackoffStorage { .map_or(false, |m| m.contains_key(peer)) } - /// Checks if a given peer is backoffed for the given topic. This method ignores BACKOFF_SLACK. - /// This method should be used for deciding if an incoming GRAFT is allowed. - /// This method returns true exactly if the backoff time is over. - pub fn is_backoff(&self, topic: &TopicHash, peer: &PeerId) -> bool { - Self::is_backoff_from_backoffs(&self.backoffs, topic, peer, Duration::from_secs(0)) + pub fn get_backoff_time(&self, topic: &TopicHash, peer: &PeerId) -> Option { + Self::get_backoff_time_from_backoffs(&self.backoffs, topic, peer) } - fn is_backoff_from_backoffs( + fn get_backoff_time_from_backoffs( backoffs: &HashMap>, topic: &TopicHash, peer: &PeerId, - slack: Duration, - ) -> bool { - backoffs.get(topic).map_or(false, |m| { - m.get(peer) - .map_or(false, |(i, _)| *i + slack > Instant::now()) - }) + ) -> Option { + backoffs + .get(topic) + .and_then(|m| m.get(peer).map(|(i, _)| *i)) } /// Applies a heartbeat. That should be called regularly in intervals of length @@ -279,8 +286,12 @@ impl BackoffStorage { if let Some(s) = self.backoffs_by_heartbeat.get_mut(self.heartbeat_index) { let backoffs = &mut self.backoffs; let slack = self.heartbeat_interval * self.backoff_slack; + let now = Instant::now(); s.retain(|(topic, peer)| { - let keep = Self::is_backoff_from_backoffs(backoffs, topic, peer, slack); + let keep = match Self::get_backoff_time_from_backoffs(backoffs, topic, peer) { + Some(backoff_time) => backoff_time + slack > now, + None => false, + }; if !keep { //remove from backoffs if let Entry::Occupied(mut m) = backoffs.entry(topic.clone()) { @@ -299,6 +310,16 @@ impl BackoffStorage { } } +pub enum MessageAcceptance { + /// The message is considered valid, and it should be delivered and forwarded to the network + Accept, + /// The message is considered invalid, and it should be rejected and trigger the P₄ penalty + Reject, + /// The message is neither delivered nor forwarded to the network, but the router does not + /// trigger the P₄ penalty. + Ignore, +} + /// Network behaviour that handles the gossipsub protocol. /// /// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`GossipsubConfig`] instance. If message signing is @@ -361,6 +382,9 @@ pub struct Gossipsub { /// Set of connected outbound peers (we only consider true outbound peers found through /// discovery and not by px) outbound_peers: HashSet, + + /// stores optional peer score data together with thresholds and decay interval + peer_score: Option<(PeerScore, PeerScoreThresholds, Interval)>, } impl Gossipsub { @@ -403,6 +427,7 @@ impl Gossipsub { config, px_peers: HashSet::new(), outbound_peers: HashSet::new(), + peer_score: None, } } @@ -522,37 +547,61 @@ impl Gossipsub { let mut recipient_peers = HashSet::new(); for topic_hash in &message.topics { - if self.config.flood_publish() { - //forward to all peers above score - if let Some(set) = self.topic_peers.get(&topic_hash) { - recipient_peers.extend(set.iter().map(|p| p.clone())); - //TODO filter out peers with too low score that are not explicit peers + if let Some(set) = self.topic_peers.get(&topic_hash) { + if self.config.flood_publish() { + //forward to all peers above score + recipient_peers.extend( + set.iter() + .filter(|p| { + self.explicit_peers.contains(*p) + || !self.score_below_threshold(*p, |ts| ts.publish_threshold).0 + }) + .map(|p| p.clone()), + ); + continue; } - } else if self.mesh.get(&topic_hash).is_none() { - debug!("Topic: {:?} not in the mesh", topic_hash); - // Build a list of peers to forward the message to - // if we have fanout peers add them to the map. - if self.fanout.contains_key(&topic_hash) { - for peer in self.fanout.get(&topic_hash).expect("Topic must exist") { + + //explicit peers + for peer in &self.explicit_peers { + if set.contains(peer) { recipient_peers.insert(peer.clone()); } - } else { - // we have no fanout peers, select mesh_n of them and add them to the fanout - let mesh_n = self.config.mesh_n(); - let new_peers = - Self::get_random_peers(&self.topic_peers, &topic_hash, mesh_n, { - |_| true - }); - // add the new peers to the fanout and recipient peers - self.fanout.insert(topic_hash.clone(), new_peers.clone()); - for peer in new_peers { - debug!("Peer added to fanout: {:?}", peer); - recipient_peers.insert(peer.clone()); + } + + //TODO floodsub peers? + + //gossipsub peers + if self.mesh.get(&topic_hash).is_none() { + debug!("Topic: {:?} not in the mesh", topic_hash); + // Build a list of peers to forward the message to + // if we have fanout peers add them to the map. + if self.fanout.contains_key(&topic_hash) { + for peer in self.fanout.get(&topic_hash).expect("Topic must exist") { + recipient_peers.insert(peer.clone()); + } + } else { + // we have no fanout peers, select mesh_n of them and add them to the fanout + let mesh_n = self.config.mesh_n(); + let new_peers = + Self::get_random_peers(&self.topic_peers, &topic_hash, mesh_n, { + |p| { + !self.explicit_peers.contains(p) + && !self + .score_below_threshold(p, |ts| ts.publish_threshold) + .0 + } + }); + // add the new peers to the fanout and recipient peers + self.fanout.insert(topic_hash.clone(), new_peers.clone()); + for peer in new_peers { + debug!("Peer added to fanout: {:?}", peer); + recipient_peers.insert(peer.clone()); + } } + // we are publishing to fanout peers - update the time we published + self.fanout_last_pub + .insert(topic_hash.clone(), Instant::now()); } - // we are publishing to fanout peers - update the time we published - self.fanout_last_pub - .insert(topic_hash.clone(), Instant::now()); } } @@ -575,35 +624,60 @@ impl Gossipsub { Ok(()) } - /// This function should be called when `config.validate_messages()` is `true` in order to - /// validate and propagate messages. Messages are stored in the ['Memcache'] and validation is expected to be - /// fast enough that the messages should still exist in the cache. + /// This function should be called when `config.validate_messages()` is `true` after the + /// message got validated by the caller. Messages are stored in the ['Memcache'] and validation + /// is expected to be fast enough that the messages should still exist in the cache.There are + /// three possible validation outcomes and the outcome is given in acceptance. /// - /// Calling this function will propagate a message stored in the cache, if it still exists. - /// If the message still exists in the cache, it will be forwarded and this function will return true, - /// otherwise it will return false. + /// If acceptance = Accept the message will get propagated to the network. The + /// `propagation_source` parameter indicates who the message was received by and will not + /// be forwarded back to that peer. /// - /// The `propagation_source` parameter indicates who the message was received by and will not - /// be forwarded back to that peer. + /// If acceptance = Reject the message will be deleted from the memcache and the P₄ penalty + /// will be applied to the `propagation_source`. + /// + /// If acceptance = Ignore the message will be deleted from the memcache but no P₄ penalty + /// will be applied. + /// + /// This function will return true if the message was found in the cache and false if was not + /// in the cache anymore. /// /// This should only be called once per message. pub fn validate_message( &mut self, message_id: &MessageId, propagation_source: &PeerId, + acceptance: MessageAcceptance, ) -> bool { - let message = match self.mcache.validate(message_id) { - Some(message) => message.clone(), - None => { - warn!( - "Message not in cache. Ignoring forwarding. Message Id: {}", - message_id - ); - return false; + let reject_reason = match acceptance { + MessageAcceptance::Accept => { + let message = match self.mcache.validate(message_id) { + Some(message) => message.clone(), + None => { + warn!( + "Message not in cache. Ignoring forwarding. Message Id: {}", + message_id + ); + return false; + } + }; + self.forward_msg(message, Some(propagation_source)); + return true; } + MessageAcceptance::Reject => RejectMsg::ValidationFailed, + MessageAcceptance::Ignore => RejectMsg::ValidationIgnored, }; - self.forward_msg(message, Some(propagation_source)); - true + + if let Some(message) = self.mcache.remove(message_id) { + //tell peer_score that message should be ignored + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.reject_message(propagation_source, &message, reject_reason); + } + true + } else { + warn!("Rejected message not in cache. Message Id: {}", message_id); + false + } } /// Adds a new peer to the list of explicitly connected peers. @@ -623,6 +697,33 @@ impl Gossipsub { self.explicit_peers.remove(peer_id); } + /// Activates the peer scoring system with the given parameters. This will reset all scores + /// if there was already another peer scoring system activated. Returns an error if the + /// params where not valid. + pub fn with_peer_score( + &mut self, + params: PeerScoreParams, + threshold: PeerScoreThresholds, + ) -> Result<(), String> { + params.validate()?; + threshold.validate()?; + + let interval = Interval::new(params.decay_interval); + let peer_score = PeerScore::new(params, self.config.message_id_fn()); + self.peer_score = Some((peer_score, threshold, interval)); + Ok(()) + } + + /// Sets the application specific score for a peer. Returns true if scoring is active and + /// the peer is connected or if the score of the peer is not yet expired, false otherwise. + pub fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool { + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.set_application_score(peer_id, new_score) + } else { + false + } + } + /// Gossipsub JOIN(topic) - adds topic peers to mesh and sends them GRAFT messages. fn join(&mut self, topic_hash: &TopicHash) { debug!("Running JOIN for topic: {:?}", topic_hash); @@ -633,7 +734,7 @@ impl Gossipsub { return; } - let mut added_peers = vec![]; + let mut added_peers = HashSet::new(); // check if we have mesh_n peers in fanout[topic] and add them to the mesh if we do, // removing the fanout entry. @@ -642,10 +743,12 @@ impl Gossipsub { "JOIN: Removing peers from the fanout for topic: {:?}", topic_hash ); - //remove explicit peers + //remove explicit peers and peers with negative scores peers = peers .into_iter() - .filter(|p| !self.explicit_peers.contains(p)) + .filter(|p| { + !self.explicit_peers.contains(p) && !self.score_below_threshold(p, |_| 0.0).0 + }) .collect(); // add up to mesh_n of them them to the mesh @@ -671,7 +774,11 @@ impl Gossipsub { &self.topic_peers, topic_hash, self.config.mesh_n() - added_peers.len(), - |peer| !added_peers.contains(peer) && !self.explicit_peers.contains(peer), + |peer| { + !added_peers.contains(peer) + && !self.explicit_peers.contains(peer) + && !self.score_below_threshold(peer, |_| 0.0).0 + }, ); added_peers.extend(new_peers.clone()); // add them to the mesh @@ -689,6 +796,9 @@ impl Gossipsub { for peer_id in added_peers { // Send a GRAFT control message info!("JOIN: Sending Graft message to peer: {:?}", peer_id); + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.graft(&peer_id, topic_hash.clone()); + } Self::control_pool_add( &mut self.control_pool, peer_id.clone(), @@ -710,7 +820,7 @@ impl Gossipsub { //select peers for peer exchange let peers = if do_px { Self::get_random_peers(&self.topic_peers, &topic_hash, self.config.prune_peers(), { - |p| p != peer //TODO score threshold + |p| p != peer && !self.score_below_threshold(p, |_| 0.0).0 }) .into_iter() .map(|p| PeerInfo { peer: Some(p) }) @@ -723,6 +833,9 @@ impl Gossipsub { self.backoffs .update_backoff(topic_hash, peer, self.config.prune_backoff()); + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.prune(peer, topic_hash.clone()); + } GossipsubControlAction::Prune { topic_hash: topic_hash.clone(), peers, @@ -758,10 +871,36 @@ impl Gossipsub { } } + fn score_below_threshold( + &self, + peer_id: &PeerId, + threshold: impl Fn(&PeerScoreThresholds) -> f64, + ) -> (bool, f64) { + if let Some((peer_score, thresholds, ..)) = &self.peer_score { + let score = peer_score.score(peer_id); + if score < threshold(thresholds) { + return (true, score); + } + (false, score) + } else { + (false, 0.0) + } + } + /// Handles an IHAVE control message. Checks our cache of messages. If the message is unknown, /// requests it with an IWANT control message. fn handle_ihave(&mut self, peer_id: &PeerId, ihave_msgs: Vec<(TopicHash, Vec)>) { + // we ignore IHAVE gossip from any peer whose score is below the gossip threshold + if let (true, score) = self.score_below_threshold(peer_id, |ts| ts.gossip_threshold) { + debug!( + "IHAVE: ignoring peer {:?} with score below threshold [score = {}]", + peer_id, score + ); + return; + } + debug!("Handling IHAVE for peer: {:?}", peer_id); + // use a hashset to avoid duplicates efficiently let mut iwant_ids = HashSet::new(); @@ -800,6 +939,15 @@ impl Gossipsub { /// Handles an IWANT control message. Checks our cache of messages. If the message exists it is /// forwarded to the requesting peer. fn handle_iwant(&mut self, peer_id: &PeerId, iwant_msgs: Vec) { + // we ignore IWANT gossip from any peer whose score is below the gossip threshold + if let (true, score) = self.score_below_threshold(peer_id, |ts| ts.gossip_threshold) { + debug!( + "IWANT: ignoring peer {:?} with score below threshold [score = {}]", + peer_id, score + ); + return; + } + debug!("Handling IWANT for peer: {:?}", peer_id); // build a hashmap of available messages let mut cached_messages = HashMap::new(); @@ -844,6 +992,8 @@ impl Gossipsub { // but don't PX do_px = false } else { + let (below_zero, score) = self.score_below_threshold(peer_id, |_| 0.0); + let now = Instant::now(); for topic_hash in topics { if let Some(peers) = self.mesh.get_mut(&topic_hash) { //if the peer is already in the mesh ignore the graft @@ -852,20 +1002,46 @@ impl Gossipsub { } // make sure we are not backing off that peer - if self.backoffs.is_backoff(&topic_hash, peer_id) { - debug!("GRAFT: ignoring backed off peer {:?}", peer_id); - //TODO add penalty - //no PX - do_px = false; - //TODO extra penalty if the graft is coming too fast (see - // GossipSubGraftFloodThreshold) + if let Some(backoff_time) = self.backoffs.get_backoff_time(&topic_hash, peer_id) + { + if backoff_time > now { + debug!("GRAFT: ignoring backed off peer {:?}", peer_id); + //add behavioural penalty + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.add_penalty(peer_id, 1); + + // check the flood cutoff + let flood_cutoff = backoff_time + + self.config.graft_flood_threshold() + - self.config.prune_backoff(); + if flood_cutoff > now { + //extra penalty + peer_score.add_penalty(peer_id, 1); + } + } + //no PX + do_px = false; + + to_prune_topics.insert(topic_hash.clone()); + continue; + } + } + //check the score + if below_zero { + // we don't GRAFT peers with negative score + debug!( + "GRAFT: ignoring peer {:?} with negative score [score = {}, \ + topic = {}]", + peer_id, score, topic_hash + ); + // we do send them PRUNE however, because it's a matter of protocol correctness to_prune_topics.insert(topic_hash.clone()); + // but we won't PX to them + do_px = false; continue; } - //TODO check score of peer - //check mesh upper bound and only allow graft if upperbound not reached or //if it is an outbound peer if peers.len() >= self.config.mesh_n_high() @@ -878,9 +1054,13 @@ impl Gossipsub { // add peer to the mesh info!( "GRAFT: Mesh link added for peer: {:?} in topic: {:?}", - peer_id, topic_hash + peer_id, &topic_hash ); peers.insert(peer_id.clone()); + + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.graft(peer_id, topic_hash); + } } else { //TODO spam hardening as in go implementation? to_prune_topics.insert(topic_hash.clone()); @@ -918,6 +1098,8 @@ impl Gossipsub { prune_data: Vec<(TopicHash, Vec, Option)>, ) { debug!("Handling PRUNE message for peer: {}", peer_id.to_string()); + let (below_threshold, score) = + self.score_below_threshold(peer_id, |ts| ts.accept_px_threshold); for (topic_hash, px, backoff) in prune_data { if let Some(peers) = self.mesh.get_mut(&topic_hash) { // remove the peer if it exists in the mesh @@ -929,6 +1111,10 @@ impl Gossipsub { ); } + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.prune(peer_id, topic_hash.clone()); + } + // is there a backoff specified by the peer? if so obey it. self.backoffs.update_backoff( &topic_hash, @@ -942,7 +1128,16 @@ impl Gossipsub { //connect to px peers if !px.is_empty() { - //TODO check score threshold before connecting + // we ignore PX from peers with insufficient score + if below_threshold { + debug!( + "PRUNE: ignoring PX from peer {:?} with insufficient score \ + [score ={} topic = {}]", + peer_id, score, topic_hash + ); + continue; + } + self.px_connect(px); } } @@ -995,11 +1190,34 @@ impl Gossipsub { msg.validated = true; } + // reject messages claiming to be from ourselves but not locally published + if let Some(own_id) = self.publish_info.get_own_id() { + if own_id != propagation_source && msg.source.as_ref().map_or(false, |s| s == own_id) { + debug!( + "Dropping message claiming to be from self but forwarded from {:?}", + propagation_source + ); + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.reject_message(propagation_source, &msg, RejectMsg::SelfOrigin); + } + return; + } + } + // Add the message to the duplication cache and memcache. if self.duplication_cache.insert(msg_id.clone(), ()).is_some() { debug!("Message already received, ignoring. Message: {:?}", msg_id); + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.duplicated_message(propagation_source, &msg); + } return; } + + //tells score that message arrived (but is maybe not fully validated yet) + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.validate_message(propagation_source, &msg); + } + self.mcache.put(msg.clone()); // dispatch the message to the user @@ -1082,6 +1300,12 @@ impl Gossipsub { propagation_source.to_string(), subscription.topic_hash ); + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.graft( + propagation_source, + subscription.topic_hash.clone(), + ); + } grafts.push(GossipsubControlAction::Graft { topic_hash: subscription.topic_hash.clone(), }); @@ -1156,10 +1380,15 @@ impl Gossipsub { let mut to_graft = HashMap::new(); let mut to_prune = HashMap::new(); + let mut no_px = HashSet::new(); //clean up expired backoffs self.backoffs.heartbeat(); + //TODO do we need to clear i have counters (see go) + + //TODO implement I want penalties (see go) + //check connections to explicit peers if self.heartbeat_ticks % self.config.check_explicit_peers_ticks() == 0 { for p in self.explicit_peers.clone() { @@ -1167,31 +1396,51 @@ impl Gossipsub { } } + //cache scores throughout the heartbeat + let mut scores = HashMap::new(); + let peer_score = &self.peer_score; + let mut score = |p: &PeerId| match peer_score { + Some((peer_score, ..)) => *scores + .entry(p.clone()) + .or_insert_with(|| peer_score.score(p)), + _ => 0.0, + }; + // maintain the mesh for each topic for (topic_hash, peers) in self.mesh.iter_mut() { let explicit_peers = &self.explicit_peers; let backoffs = &self.backoffs; let topic_peers = &self.topic_peers; - let mut insert_peers = - |peers: &mut BTreeSet, n, outbound_peers: Option<&HashSet>| { - let peer_list = Self::get_random_peers(topic_peers, topic_hash, n, |peer| { - !peers.contains(peer) - && !explicit_peers.contains(peer) - && !backoffs.is_backoff_with_slack(topic_hash, peer) - //TODO score filter - && match outbound_peers { - Some(outbound_peers) => outbound_peers.contains(peer), - None => true - } - }); - for peer in &peer_list { - let current_topic = to_graft.entry(peer.clone()).or_insert_with(Vec::new); + let outbound_peers = &self.outbound_peers; + + // drop all peers with negative score, without PX + // TODO can we make this more efficient? Unfortunately there is no retain method for + // BTreeSet (yet). + let to_remove: Vec<_> = peers + .iter() + .filter(|&p| { + if score(p) < 0.0 { + debug!( + "HEARTBEAT: Prune peer {:?} with negative score [score = {}, topic = \ + {}]", + p, + score(p), + topic_hash + ); + + let current_topic = to_prune.entry(p.clone()).or_insert_with(Vec::new); current_topic.push(topic_hash.clone()); + no_px.insert(p.clone()); + true + } else { + false } - // update the mesh - debug!("Updating mesh, new mesh: {:?}", peer_list); - peers.extend(peer_list); - }; + }) + .cloned() + .collect(); + for peer in to_remove { + peers.remove(&peer); + } // too little peers - add some if peers.len() < self.config.mesh_n_low() { @@ -1203,7 +1452,20 @@ impl Gossipsub { ); // not enough peers - get mesh_n - current_length more let desired_peers = self.config.mesh_n() - peers.len(); - insert_peers(peers, desired_peers, None); + let peer_list = + Self::get_random_peers(topic_peers, topic_hash, desired_peers, |peer| { + !peers.contains(peer) + && !explicit_peers.contains(peer) + && !backoffs.is_backoff_with_slack(topic_hash, peer) + && score(peer) >= 0.0 + }); + for peer in &peer_list { + let current_topic = to_graft.entry(peer.clone()).or_insert_with(Vec::new); + current_topic.push(topic_hash.clone()); + } + // update the mesh + debug!("Updating mesh, new mesh: {:?}", peer_list); + peers.extend(peer_list); } // too many peers - remove some @@ -1215,10 +1477,15 @@ impl Gossipsub { self.config.mesh_n_high() ); let excess_peer_no = peers.len() - self.config.mesh_n(); - // shuffle the peers + + // shuffle the peers and then sort by score ascending beginning with the worst let mut rng = thread_rng(); let mut shuffled = peers.iter().cloned().collect::>(); shuffled.shuffle(&mut rng); + shuffled + .sort_by(|p1, p2| score(p1).partial_cmp(&score(p2)).unwrap_or(Ordering::Equal)); + //shuffle everything except the last retain_scores many peers (the best ones) + shuffled[..peers.len() - self.config.retain_scores()].shuffle(&mut rng); //count total number of outbound peers let mut outbound = { @@ -1257,17 +1524,30 @@ impl Gossipsub { // do we have enough outbound peers? if peers.len() >= self.config.mesh_n_low() { //count number of outbound peers we have - let outbound = { - let outbound_peers = &self.outbound_peers; - peers.iter().filter(|p| outbound_peers.contains(*p)).count() - }; + let outbound = { peers.iter().filter(|p| outbound_peers.contains(*p)).count() }; //if we have not enough outbound peers, graft to some new outbound peers if outbound < self.config.mesh_outbound_min() { let needed = self.config.mesh_outbound_min() - outbound; - insert_peers(peers, needed, Some(&self.outbound_peers)); + let peer_list = + Self::get_random_peers(topic_peers, topic_hash, needed, |peer| { + !peers.contains(peer) + && !explicit_peers.contains(peer) + && !backoffs.is_backoff_with_slack(topic_hash, peer) + && score(peer) >= 0.0 + && outbound_peers.contains(peer) + }); + for peer in &peer_list { + let current_topic = to_graft.entry(peer.clone()).or_insert_with(Vec::new); + current_topic.push(topic_hash.clone()); + } + // update the mesh + debug!("Updating mesh, new mesh: {:?}", peer_list); + peers.extend(peer_list); } } + + //TODO opportunistic grafting } // remove expired fanout topics @@ -1291,11 +1571,15 @@ impl Gossipsub { // check if our peers are still a part of the topic for (topic_hash, peers) in self.fanout.iter_mut() { let mut to_remove_peers = Vec::new(); + let publish_threshold = match &self.peer_score { + Some((_, thresholds, _)) => thresholds.publish_threshold, + _ => 0.0, + }; for peer in peers.iter() { // is the peer still subscribed to the topic? match self.peer_topics.get(peer) { Some(topics) => { - if !topics.contains(&topic_hash) { + if !topics.contains(&topic_hash) || score(peer) < publish_threshold { debug!( "HEARTBEAT: Peer removed from fanout for topic: {:?}", topic_hash @@ -1321,9 +1605,12 @@ impl Gossipsub { self.config.mesh_n() ); let needed_peers = self.config.mesh_n() - peers.len(); + let explicit_peers = &self.explicit_peers; let new_peers = Self::get_random_peers(&self.topic_peers, topic_hash, needed_peers, |peer| { !peers.contains(peer) + && !explicit_peers.contains(peer) + && score(peer) < publish_threshold }); peers.extend(new_peers); } @@ -1333,7 +1620,7 @@ impl Gossipsub { // send graft/prunes if !to_graft.is_empty() | !to_prune.is_empty() { - self.send_graft_prune(to_graft, to_prune); + self.send_graft_prune(to_graft, to_prune, no_px); } // piggyback pooled control messages @@ -1364,7 +1651,9 @@ impl Gossipsub { // get gossip_lazy random peers let to_msg_peers = Self::get_random_peers_dynamic(&self.topic_peers, &topic_hash, n_map, |peer| { - !peers.contains(peer) && !self.explicit_peers.contains(peer) + !peers.contains(peer) + && !self.explicit_peers.contains(peer) + && !self.score_below_threshold(peer, |ts| ts.gossip_threshold).0 }); debug!("Gossiping IHAVE to {} peers.", to_msg_peers.len()); @@ -1389,9 +1678,16 @@ impl Gossipsub { &mut self, to_graft: HashMap>, mut to_prune: HashMap>, + no_px: HashSet, ) { // handle the grafts and overlapping prunes per peer for (peer, topics) in to_graft.iter() { + for topic in topics { + //inform scoring of graft + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.graft(peer, topic.clone()); + } + } let mut control_msgs: Vec = topics .iter() .map(|topic_hash| GossipsubControlAction::Graft { @@ -1403,7 +1699,13 @@ impl Gossipsub { if let Some(topics) = to_prune.remove(peer) { let mut prunes = topics .iter() - .map(|topic_hash| self.make_prune(topic_hash, peer, self.config.do_px())) + .map(|topic_hash| { + self.make_prune( + topic_hash, + peer, + self.config.do_px() && !no_px.contains(peer), + ) + }) .collect::>(); control_msgs.append(&mut prunes); } @@ -1423,7 +1725,13 @@ impl Gossipsub { for (peer, topics) in to_prune.iter() { let remaining_prunes = topics .iter() - .map(|topic_hash| self.make_prune(topic_hash, peer, self.config.do_px())) + .map(|topic_hash| { + self.make_prune( + topic_hash, + peer, + self.config.do_px() && !no_px.contains(peer), + ) + }) .collect(); self.send_message( peer.clone(), @@ -1439,6 +1747,13 @@ impl Gossipsub { /// Helper function which forwards a message to mesh\[topic\] peers. /// Returns true if at least one peer was messaged. fn forward_msg(&mut self, message: GossipsubMessage, source: Option<&PeerId>) -> bool { + //message is fully validated, inform peer_score + if let Some((peer_score, ..)) = &mut self.peer_score { + if let Some(peer) = source { + peer_score.deliver_message(peer, &message); + } + } + let msg_id = (self.config.message_id_fn())(&message); debug!("Forwarding message: {:?}", msg_id); let mut recipient_peers = HashSet::new(); @@ -1658,6 +1973,21 @@ impl Gossipsub { } } +fn get_remote_addr(endpoint: &ConnectedPoint) -> &Multiaddr { + match endpoint { + ConnectedPoint::Dialer { address } => address, + ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr, + } +} + +fn get_ip_addr(addr: &Multiaddr) -> Option { + addr.iter().find_map(|p| match p { + Ip4(addr) => Some(IpAddr::V4(addr)), + Ip6(addr) => Some(IpAddr::V6(addr)), + _ => None, + }) +} + impl NetworkBehaviour for Gossipsub { type ProtocolsHandler = GossipsubHandler; type OutEvent = GossipsubEvent; @@ -1699,6 +2029,10 @@ impl NetworkBehaviour for Gossipsub { // For the time being assume all gossipsub peers self.peer_topics.insert(id.clone(), Default::default()); + + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.add_peer(id.clone()); + } } fn inject_disconnected(&mut self, id: &PeerId) { @@ -1746,6 +2080,10 @@ impl NetworkBehaviour for Gossipsub { // remove peer from peer_topics let was_in = self.peer_topics.remove(id); debug_assert!(was_in.is_some()); + + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.remove_peer(id); + } } fn inject_connection_established( @@ -1769,9 +2107,55 @@ impl NetworkBehaviour for Gossipsub { self.outbound_peers.insert(peer.clone()); } } + + //add ip to peer scoring system + if let Some((peer_score, ..)) = &mut self.peer_score { + if let Some(ip) = get_ip_addr(get_remote_addr(endpoint)) { + peer_score.add_ip(peer.clone(), ip); + } + } + } + + fn inject_connection_closed( + &mut self, + peer: &PeerId, + _: &ConnectionId, + endpoint: &ConnectedPoint, + ) { + //remove ip from peer scoring system + if let Some((peer_score, ..)) = &mut self.peer_score { + if let Some(ip) = get_ip_addr(get_remote_addr(endpoint)) { + peer_score.remove_ip(peer, &ip); + } + } + } + + fn inject_address_change( + &mut self, + peer: &PeerId, + _: &ConnectionId, + endpoint_old: &ConnectedPoint, + endpoint_new: &ConnectedPoint, + ) { + //exchange ip in peer scoring system + if let Some((peer_score, ..)) = &mut self.peer_score { + if let Some(ip) = get_ip_addr(get_remote_addr(endpoint_old)) { + peer_score.remove_ip(peer, &ip); + } + if let Some(ip) = get_ip_addr(get_remote_addr(endpoint_new)) { + peer_score.add_ip(peer.clone(), ip); + } + } } fn inject_event(&mut self, propagation_source: PeerId, _: ConnectionId, event: GossipsubRpc) { + //check if peer is graylisted in which case we ignore the event + if let (true, _) = + self.score_below_threshold(&propagation_source, |ts| ts.graylist_threshold) + { + return; + } + // Handle subscriptions // Update connected peers topics if !event.subscriptions.is_empty() { @@ -1858,6 +2242,13 @@ impl NetworkBehaviour for Gossipsub { }); } + //update scores + if let Some((peer_score, _, interval)) = &mut self.peer_score { + while let Poll::Ready(Some(())) = interval.poll_next_unpin(cx) { + peer_score.refresh_scores(); + } + } + while let Poll::Ready(Some(())) = self.heartbeat.poll_next_unpin(cx) { self.heartbeat(); } diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 82fbe1b9646..0feb3e7ad3b 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -25,9 +25,11 @@ mod tests { use std::thread::sleep; use std::time::Duration; - use crate::{GossipsubConfigBuilder, IdentTopic as Topic}; + use crate::{GossipsubConfigBuilder, IdentTopic as Topic, TopicScoreParams}; use super::super::*; + use async_std::net::Ipv4Addr; + use rand::Rng; // helper functions for testing @@ -64,45 +66,14 @@ mod tests { gs_config: GossipsubConfig, explicit: usize, ) -> (Gossipsub, Vec, Vec) { - let keypair = libp2p_core::identity::Keypair::generate_secp256k1(); - // create a gossipsub struct - let mut gs: Gossipsub = Gossipsub::new(MessageAuthenticity::Signed(keypair), gs_config); - - let mut topic_hashes = vec![]; - - // subscribe to the topics - for t in topics { - let topic = Topic::new(t); - gs.subscribe(topic.clone()); - topic_hashes.push(topic.hash().clone()); - } - - // build and connect peer_no random peers - let mut peers = vec![]; - - for i in 0..peer_no { - let peer = PeerId::random(); - peers.push(peer.clone()); - ::inject_connected(&mut gs, &peer); - if i < explicit { - gs.add_explicit_peer(&peer); - } - if to_subscribe { - gs.handle_received_subscriptions( - &topic_hashes - .iter() - .cloned() - .map(|t| GossipsubSubscription { - action: GossipsubSubscriptionAction::Subscribe, - topic_hash: t, - }) - .collect::>(), - &peer, - ); - }; - } - - return (gs, peers, topic_hashes); + build_and_inject_nodes_with_config_and_explicit_and_outbound( + peer_no, + topics, + to_subscribe, + gs_config, + explicit, + 0, + ) } fn build_and_inject_nodes_with_config_and_explicit_and_outbound( @@ -112,11 +83,36 @@ mod tests { gs_config: GossipsubConfig, explicit: usize, outbound: usize, + ) -> (Gossipsub, Vec, Vec) { + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + peer_no, + topics, + to_subscribe, + gs_config, + explicit, + outbound, + None, + ) + } + + fn build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + peer_no: usize, + topics: Vec, + to_subscribe: bool, + gs_config: GossipsubConfig, + explicit: usize, + outbound: usize, + scoring: Option<(PeerScoreParams, PeerScoreThresholds)>, ) -> (Gossipsub, Vec, Vec) { let keypair = libp2p_core::identity::Keypair::generate_secp256k1(); // create a gossipsub struct let mut gs: Gossipsub = Gossipsub::new(MessageAuthenticity::Signed(keypair), gs_config); + if let Some((scoring_params, scoring_thresholds)) = scoring { + gs.with_peer_score(scoring_params, scoring_thresholds) + .unwrap(); + } + let mut topic_hashes = vec![]; // subscribe to the topics @@ -129,40 +125,69 @@ mod tests { // build and connect peer_no random peers let mut peers = vec![]; + let empty = vec![]; for i in 0..peer_no { - let peer = PeerId::random(); - peers.push(peer.clone()); - if i < outbound { - gs.inject_connection_established( - &peer, - &ConnectionId::new(0), - &ConnectedPoint::Dialer { - address: Multiaddr::empty(), - }, - ); - } - ::inject_connected(&mut gs, &peer); - if i < explicit { - gs.add_explicit_peer(&peer); - } - if to_subscribe { - gs.handle_received_subscriptions( - &topic_hashes - .iter() - .cloned() - .map(|t| GossipsubSubscription { - action: GossipsubSubscriptionAction::Subscribe, - topic_hash: t, - }) - .collect::>(), - &peer, - ); - }; + peers.push(add_peer( + &mut gs, + if to_subscribe { &topic_hashes } else { &empty }, + i < outbound, + i < explicit, + )); } return (gs, peers, topic_hashes); } + fn add_peer( + gs: &mut Gossipsub, + topic_hashes: &Vec, + outbound: bool, + explicit: bool, + ) -> PeerId { + add_peer_with_addr(gs, topic_hashes, outbound, explicit, Multiaddr::empty()) + } + + fn add_peer_with_addr( + gs: &mut Gossipsub, + topic_hashes: &Vec, + outbound: bool, + explicit: bool, + address: Multiaddr, + ) -> PeerId { + let peer = PeerId::random(); + //peers.push(peer.clone()); + gs.inject_connection_established( + &peer, + &ConnectionId::new(0), + &if outbound { + ConnectedPoint::Dialer { address } + } else { + ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: address, + } + }, + ); + ::inject_connected(gs, &peer); + if explicit { + gs.add_explicit_peer(&peer); + } + if !topic_hashes.is_empty() { + gs.handle_received_subscriptions( + &topic_hashes + .iter() + .cloned() + .map(|t| GossipsubSubscription { + action: GossipsubSubscriptionAction::Subscribe, + topic_hash: t, + }) + .collect::>(), + &peer, + ); + } + peer + } + #[test] /// Test local node subscribing to a topic fn test_subscribe() { @@ -1488,6 +1513,7 @@ mod tests { vec![(peers[0].clone(), vec![topics[0].clone()])] .into_iter() .collect(), + HashSet::new(), ); //check prune message @@ -1526,6 +1552,7 @@ mod tests { vec![(peers[0].clone(), vec![topics[0].clone()])] .into_iter() .collect(), + HashSet::new(), ); //ignore all messages until now @@ -1668,15 +1695,7 @@ mod tests { // subscribe an additional new peer to test2 gs.subscribe(other_topic.clone()); - let other_peer = PeerId::random(); - gs.inject_connected(&other_peer); - gs.handle_received_subscriptions( - &vec![GossipsubSubscription { - action: GossipsubSubscriptionAction::Subscribe, - topic_hash: other_topic.hash(), - }], - &other_peer, - ); + add_peer(&mut gs, &vec![other_topic.hash()], false, false); //publish message let publish_data = vec![0; 42]; @@ -1805,32 +1824,13 @@ mod tests { gs.handle_graft(&peer, topics.clone()); } - //create an outbound and an inbound peer - let inbound = PeerId::random(); - let outbound = PeerId::random(); - gs.inject_connection_established( - &outbound, - &ConnectionId::new(0), - &ConnectedPoint::Dialer { - address: Multiaddr::empty(), - }, - ); - - //inject_connected and subscription - for peer in &[&inbound, &outbound] { - gs.inject_connected(peer); - gs.handle_received_subscriptions( - &vec![GossipsubSubscription { - action: GossipsubSubscriptionAction::Subscribe, - topic_hash: topics[0].clone(), - }], - peer, - ); - } - //assert current mesh size assert_eq!(gs.mesh[&topics[0]].len(), config.mesh_n_high()); + //create an outbound and an inbound peer + let inbound = add_peer(&mut gs, &topics, false, false); + let outbound = add_peer(&mut gs, &topics, true, false); + //send grafts gs.handle_graft(&inbound, vec![topics[0].clone()]); gs.handle_graft(&outbound, vec![topics[0].clone()]); @@ -1870,23 +1870,8 @@ mod tests { //create m outbound connections and graft (we will accept the graft) let mut outbound = HashSet::new(); for _ in 0..m { - let peer = PeerId::random(); + let peer = add_peer(&mut gs, &topics, true, false); outbound.insert(peer.clone()); - gs.inject_connection_established( - &peer, - &ConnectionId::new(0), - &ConnectedPoint::Dialer { - address: Multiaddr::empty(), - }, - ); - gs.inject_connected(&peer); - gs.handle_received_subscriptions( - &vec![GossipsubSubscription { - action: GossipsubSubscriptionAction::Subscribe, - topic_hash: topics[0].clone(), - }], - &peer, - ); gs.handle_graft(&peer, topics.clone()); } @@ -1918,22 +1903,7 @@ mod tests { //create config.mesh_outbound_min() many outbound connections without grafting for _ in 0..config.mesh_outbound_min() { - let peer = PeerId::random(); - gs.inject_connection_established( - &peer, - &ConnectionId::new(0), - &ConnectedPoint::Dialer { - address: Multiaddr::empty(), - }, - ); - gs.inject_connected(&peer); - gs.handle_received_subscriptions( - &vec![GossipsubSubscription { - action: GossipsubSubscriptionAction::Subscribe, - topic_hash: topics[0].clone(), - }], - &peer, - ); + add_peer(&mut gs, &topics, true, false); } // Nothing changed in the mesh yet @@ -1954,4 +1924,1791 @@ mod tests { // `inject_connection_established` for the first connection is done before `inject_connected` // gets called. For all further connections `inject_connection_established` should get called // after `inject_connected`. + + #[test] + fn test_prune_negative_scored_peers() { + let config = GossipsubConfig::default(); + + //build mesh with one peer + let (mut gs, peers, topics) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + 1, + vec!["test".into()], + true, + config.clone(), + 0, + 0, + Some((PeerScoreParams::default(), PeerScoreThresholds::default())), + ); + + //add penalty to peer + gs.peer_score.as_mut().unwrap().0.add_penalty(&peers[0], 1); + + //execute heartbeat + gs.heartbeat(); + + //peer should not be in mesh anymore + assert!(gs.mesh[&topics[0]].is_empty()); + + //check prune message + assert_eq!( + count_control_msgs(&gs, |peer_id, m| peer_id == &peers[0] + && match m { + GossipsubControlAction::Prune { + topic_hash, + peers, + backoff, + } => + topic_hash == &topics[0] && + //no px in this case + peers.is_empty() && + backoff.unwrap() == config.prune_backoff().as_secs(), + _ => false, + }), + 1 + ); + } + + #[test] + fn test_dont_graft_to_negative_scored_peers() { + let config = GossipsubConfig::default(); + //init full mesh + let (mut gs, peers, topics) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + config.mesh_n_high(), + vec!["test".into()], + true, + config.clone(), + 0, + 0, + Some((PeerScoreParams::default(), PeerScoreThresholds::default())), + ); + + //add two additional peers that will not be part of the mesh + let p1 = add_peer(&mut gs, &topics, false, false); + let p2 = add_peer(&mut gs, &topics, false, false); + + //reduce score of p1 to negative + gs.peer_score.as_mut().unwrap().0.add_penalty(&p1, 1); + + //handle prunes of all other peers + for p in peers { + gs.handle_prune(&p, vec![(topics[0].clone(), Vec::new(), None)]); + } + + //heartbeat + gs.heartbeat(); + + //assert that mesh only contains p2 + assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), 1); + assert!(gs.mesh.get(&topics[0]).unwrap().contains(&p2)); + } + + ///Note that in this test also without a penalty the px would be ignored because of the + /// acceptPXThreshold, but the spec still explicitely states the rule that px from negative + /// peers should get ignored, therefore we test it here. + #[test] + fn test_ignore_px_from_negative_scored_peer() { + let config = GossipsubConfig::default(); + + //build mesh with one peer + let (mut gs, peers, topics) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + 1, + vec!["test".into()], + true, + config.clone(), + 0, + 0, + Some((PeerScoreParams::default(), PeerScoreThresholds::default())), + ); + + //penalize peer + gs.peer_score.as_mut().unwrap().0.add_penalty(&peers[0], 1); + + //handle prune from single peer with px peers + let px = vec![PeerInfo { + peer: Some(PeerId::random()), + }]; + + gs.handle_prune( + &peers[0], + vec![( + topics[0].clone(), + px.clone(), + Some(config.prune_backoff().as_secs()), + )], + ); + + //assert no dials + assert_eq!( + gs.events + .iter() + .filter(|e| match e { + NetworkBehaviourAction::DialPeer { .. } => true, + _ => false, + }) + .count(), + 0 + ); + } + + #[test] + fn test_only_send_nonnegative_scoring_peers_in_px() { + let config = GossipsubConfig::default(); + + //build mesh with three peer + let (mut gs, peers, topics) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + 3, + vec!["test".into()], + true, + config.clone(), + 0, + 0, + Some((PeerScoreParams::default(), PeerScoreThresholds::default())), + ); + + //penalize first peer + gs.peer_score.as_mut().unwrap().0.add_penalty(&peers[0], 1); + + //prune second peer + gs.send_graft_prune( + HashMap::new(), + vec![(peers[1].clone(), vec![topics[0].clone()])] + .into_iter() + .collect(), + HashSet::new(), + ); + + //check that px in prune message only contains third peer + assert_eq!( + count_control_msgs(&gs, |peer_id, m| peer_id == &peers[1] + && match m { + GossipsubControlAction::Prune { + topic_hash, + peers: px, + .. + } => + topic_hash == &topics[0] + && px.len() == 1 + && px[0].peer.as_ref().unwrap() == &peers[2], + _ => false, + }), + 1 + ); + } + + #[test] + fn test_do_not_gossip_to_peers_below_gossip_threshold() { + let config = GossipsubConfig::default(); + let peer_score_params = PeerScoreParams::default(); + let mut peer_score_thresholds = PeerScoreThresholds::default(); + peer_score_thresholds.gossip_threshold = 3.0 * peer_score_params.behaviour_penalty_weight; + + //build full mesh + let (mut gs, peers, topics) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + config.mesh_n_high(), + vec!["test".into()], + true, + config.clone(), + 0, + 0, + Some((peer_score_params, peer_score_thresholds)), + ); + + // graft all the peer + for peer in peers { + gs.handle_graft(&peer, topics.clone()); + } + + //add two additional peers that will not be part of the mesh + let p1 = add_peer(&mut gs, &topics, false, false); + let p2 = add_peer(&mut gs, &topics, false, false); + + //reduce score of p1 below peer_score_thresholds.gossip_threshold + //note that penalties get squared so two penalties means a score of + // 4 * peer_score_params.behaviour_penalty_weight. + gs.peer_score.as_mut().unwrap().0.add_penalty(&p1, 2); + + //reduce score of p2 below 0 but not below peer_score_thresholds.gossip_threshold + gs.peer_score.as_mut().unwrap().0.add_penalty(&p2, 1); + + //receive message + let message = GossipsubMessage { + source: Some(PeerId::random()), + data: vec![], + sequence_number: Some(0), + topics: vec![topics[0].clone()], + signature: None, + key: None, + validated: true, + }; + gs.handle_received_message(message.clone(), &PeerId::random()); + + //emit gossip + gs.emit_gossip(); + + let msg_id = (gs.config.message_id_fn())(&message); + //check that exactly one gossip messages got sent and it got sent to p2 + assert_eq!( + count_control_msgs(&gs, |peer, action| match action { + GossipsubControlAction::IHave { + topic_hash, + message_ids, + } => { + if topic_hash == &topics[0] && message_ids.iter().any(|id| id == &msg_id) { + assert_eq!(peer, &p2); + true + } else { + false + } + } + _ => false, + }), + 1 + ); + } + + #[test] + fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { + let config = GossipsubConfig::default(); + let peer_score_params = PeerScoreParams::default(); + let mut peer_score_thresholds = PeerScoreThresholds::default(); + peer_score_thresholds.gossip_threshold = 3.0 * peer_score_params.behaviour_penalty_weight; + + //build full mesh + let (mut gs, peers, topics) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + config.mesh_n_high(), + vec!["test".into()], + true, + config.clone(), + 0, + 0, + Some((peer_score_params, peer_score_thresholds)), + ); + + // graft all the peer + for peer in peers { + gs.handle_graft(&peer, topics.clone()); + } + + //add two additional peers that will not be part of the mesh + let p1 = add_peer(&mut gs, &topics, false, false); + let p2 = add_peer(&mut gs, &topics, false, false); + + //reduce score of p1 below peer_score_thresholds.gossip_threshold + //note that penalties get squared so two penalties means a score of + // 4 * peer_score_params.behaviour_penalty_weight. + gs.peer_score.as_mut().unwrap().0.add_penalty(&p1, 2); + + //reduce score of p2 below 0 but not below peer_score_thresholds.gossip_threshold + gs.peer_score.as_mut().unwrap().0.add_penalty(&p2, 1); + + //receive message + let message = GossipsubMessage { + source: Some(PeerId::random()), + data: vec![], + sequence_number: Some(0), + topics: vec![topics[0].clone()], + signature: None, + key: None, + validated: true, + }; + gs.handle_received_message(message.clone(), &PeerId::random()); + + let id = gs.config.message_id_fn(); + let msg_id = id(&message); + + gs.handle_iwant(&p1, vec![msg_id.clone()]); + gs.handle_iwant(&p2, vec![msg_id.clone()]); + + // the messages we are sending + let sent_messages = gs + .events + .iter() + .fold(vec![], |mut collected_messages, e| match e { + NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => { + for c in &event.messages { + collected_messages.push((peer_id.clone(), c.clone())) + } + collected_messages + } + _ => collected_messages, + }); + + //the message got sent to p2 + assert!(sent_messages + .iter() + .any(|(peer_id, msg)| peer_id == &p2 && &id(msg) == &msg_id)); + //the message got not sent to p1 + assert!(sent_messages + .iter() + .all(|(peer_id, msg)| !(peer_id == &p1 && &id(msg) == &msg_id))); + } + + #[test] + fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { + let config = GossipsubConfig::default(); + let peer_score_params = PeerScoreParams::default(); + let mut peer_score_thresholds = PeerScoreThresholds::default(); + peer_score_thresholds.gossip_threshold = 3.0 * peer_score_params.behaviour_penalty_weight; + + //build full mesh + let (mut gs, peers, topics) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + config.mesh_n_high(), + vec!["test".into()], + true, + config.clone(), + 0, + 0, + Some((peer_score_params, peer_score_thresholds)), + ); + + // graft all the peer + for peer in peers { + gs.handle_graft(&peer, topics.clone()); + } + + //add two additional peers that will not be part of the mesh + let p1 = add_peer(&mut gs, &topics, false, false); + let p2 = add_peer(&mut gs, &topics, false, false); + + //reduce score of p1 below peer_score_thresholds.gossip_threshold + //note that penalties get squared so two penalties means a score of + // 4 * peer_score_params.behaviour_penalty_weight. + gs.peer_score.as_mut().unwrap().0.add_penalty(&p1, 2); + + //reduce score of p2 below 0 but not below peer_score_thresholds.gossip_threshold + gs.peer_score.as_mut().unwrap().0.add_penalty(&p2, 1); + + //message that other peers have + let message = GossipsubMessage { + source: Some(PeerId::random()), + data: vec![], + sequence_number: Some(0), + topics: vec![topics[0].clone()], + signature: None, + key: None, + validated: true, + }; + + let id = gs.config.message_id_fn(); + let msg_id = id(&message); + + gs.handle_ihave(&p1, vec![(topics[0].clone(), vec![msg_id.clone()])]); + gs.handle_ihave(&p2, vec![(topics[0].clone(), vec![msg_id.clone()])]); + + // check that we sent exactly one IWANT request to p2 + assert_eq!( + count_control_msgs(&gs, |peer, c| match c { + GossipsubControlAction::IWant { message_ids } => + if message_ids.iter().any(|m| m == &msg_id) { + assert_eq!(peer, &p2); + true + } else { + false + }, + _ => false, + }), + 1 + ); + } + + #[test] + fn test_do_not_publish_to_peer_below_publish_threshold() { + let config = GossipsubConfigBuilder::new() + .flood_publish(false) + .build() + .unwrap(); + let peer_score_params = PeerScoreParams::default(); + let mut peer_score_thresholds = PeerScoreThresholds::default(); + peer_score_thresholds.gossip_threshold = 0.5 * peer_score_params.behaviour_penalty_weight; + peer_score_thresholds.publish_threshold = 3.0 * peer_score_params.behaviour_penalty_weight; + + //build mesh with no peers and no subscribed topics + let (mut gs, _, _) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + 0, + vec![], + true, + config.clone(), + 0, + 0, + Some((peer_score_params, peer_score_thresholds)), + ); + + //create a new topic for which we are not subscribed + let topic = Topic::new("test"); + let topics = vec![topic.hash()]; + + //add two additional peers that will be added to the mesh + let p1 = add_peer(&mut gs, &topics, false, false); + let p2 = add_peer(&mut gs, &topics, false, false); + + //reduce score of p1 below peer_score_thresholds.publish_threshold + //note that penalties get squared so two penalties means a score of + // 4 * peer_score_params.behaviour_penalty_weight. + gs.peer_score.as_mut().unwrap().0.add_penalty(&p1, 2); + + //reduce score of p2 below 0 but not below peer_score_thresholds.publish_threshold + gs.peer_score.as_mut().unwrap().0.add_penalty(&p2, 1); + + //a heartbeat will remove the peers from the mesh + gs.heartbeat(); + + // publish on topic + let publish_data = vec![0; 42]; + gs.publish(topic, publish_data).unwrap(); + + // Collect all publish messages + let publishes = gs + .events + .iter() + .fold(vec![], |mut collected_publish, e| match e { + NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => { + for s in &event.messages { + collected_publish.push((peer_id.clone(), s.clone())); + } + collected_publish + } + _ => collected_publish, + }); + + //assert only published to p2 + assert_eq!(publishes.len(), 1); + assert_eq!(publishes[0].0, p2); + } + + #[test] + fn test_do_not_flood_publish_to_peer_below_publish_threshold() { + let config = GossipsubConfig::default(); + let peer_score_params = PeerScoreParams::default(); + let mut peer_score_thresholds = PeerScoreThresholds::default(); + peer_score_thresholds.gossip_threshold = 0.5 * peer_score_params.behaviour_penalty_weight; + peer_score_thresholds.publish_threshold = 3.0 * peer_score_params.behaviour_penalty_weight; + + //build mesh with no peers + let (mut gs, _, topics) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + 0, + vec!["test".into()], + true, + config.clone(), + 0, + 0, + Some((peer_score_params, peer_score_thresholds)), + ); + + //add two additional peers that will be added to the mesh + let p1 = add_peer(&mut gs, &topics, false, false); + let p2 = add_peer(&mut gs, &topics, false, false); + + //reduce score of p1 below peer_score_thresholds.publish_threshold + //note that penalties get squared so two penalties means a score of + // 4 * peer_score_params.behaviour_penalty_weight. + gs.peer_score.as_mut().unwrap().0.add_penalty(&p1, 2); + + //reduce score of p2 below 0 but not below peer_score_thresholds.publish_threshold + gs.peer_score.as_mut().unwrap().0.add_penalty(&p2, 1); + + //a heartbeat will remove the peers from the mesh + gs.heartbeat(); + + // publish on topic + let publish_data = vec![0; 42]; + gs.publish(Topic::new("test"), publish_data).unwrap(); + + // Collect all publish messages + let publishes = gs + .events + .iter() + .fold(vec![], |mut collected_publish, e| match e { + NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => { + for s in &event.messages { + collected_publish.push((peer_id.clone(), s.clone())); + } + collected_publish + } + _ => collected_publish, + }); + + //assert only published to p2 + assert_eq!(publishes.len(), 1); + assert!(publishes[0].0 == p2); + } + + #[test] + fn test_ignore_rpc_from_peers_below_graylist_threshold() { + let config = GossipsubConfig::default(); + let peer_score_params = PeerScoreParams::default(); + let mut peer_score_thresholds = PeerScoreThresholds::default(); + peer_score_thresholds.gossip_threshold = 0.5 * peer_score_params.behaviour_penalty_weight; + peer_score_thresholds.publish_threshold = 0.5 * peer_score_params.behaviour_penalty_weight; + peer_score_thresholds.graylist_threshold = 3.0 * peer_score_params.behaviour_penalty_weight; + + //build mesh with no peers + let (mut gs, _, topics) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + 0, + vec!["test".into()], + true, + config.clone(), + 0, + 0, + Some((peer_score_params, peer_score_thresholds)), + ); + + //add two additional peers that will be added to the mesh + let p1 = add_peer(&mut gs, &topics, false, false); + let p2 = add_peer(&mut gs, &topics, false, false); + + //reduce score of p1 below peer_score_thresholds.graylist_threshold + //note that penalties get squared so two penalties means a score of + // 4 * peer_score_params.behaviour_penalty_weight. + gs.peer_score.as_mut().unwrap().0.add_penalty(&p1, 2); + + //reduce score of p2 below publish_threshold but not below graylist_threshold + gs.peer_score.as_mut().unwrap().0.add_penalty(&p2, 1); + + let id = gs.config.message_id_fn(); + + let message1 = GossipsubMessage { + source: Some(PeerId::random()), + data: vec![1, 2, 3, 4], + sequence_number: Some(1u64), + topics: topics.clone(), + signature: None, + key: None, + validated: true, + }; + + let message2 = GossipsubMessage { + source: Some(PeerId::random()), + data: vec![1, 2, 3, 4, 5], + sequence_number: Some(2u64), + topics: topics.clone(), + signature: None, + key: None, + validated: true, + }; + + let message3 = GossipsubMessage { + source: Some(PeerId::random()), + data: vec![1, 2, 3, 4, 5, 6], + sequence_number: Some(3u64), + topics: topics.clone(), + signature: None, + key: None, + validated: true, + }; + + let message4 = GossipsubMessage { + source: Some(PeerId::random()), + data: vec![1, 2, 3, 4, 5, 6, 7], + sequence_number: Some(4u64), + topics: topics.clone(), + signature: None, + key: None, + validated: true, + }; + + let subscription = GossipsubSubscription { + action: GossipsubSubscriptionAction::Subscribe, + topic_hash: topics[0].clone(), + }; + + let control_action = GossipsubControlAction::IHave { + topic_hash: topics[0].clone(), + message_ids: vec![id(&message2)], + }; + + //clear events + gs.events.clear(); + + //receive from p1 + gs.inject_event( + p1.clone(), + ConnectionId::new(0), + GossipsubRpc { + messages: vec![message1], + subscriptions: vec![subscription.clone()], + control_msgs: vec![control_action], + }, + ); + + //no events got processed + assert!(gs.events.is_empty()); + + let control_action = GossipsubControlAction::IHave { + topic_hash: topics[0].clone(), + message_ids: vec![id(&message4)], + }; + + //receive from p2 + gs.inject_event( + p2.clone(), + ConnectionId::new(0), + GossipsubRpc { + messages: vec![message3], + subscriptions: vec![subscription.clone()], + control_msgs: vec![control_action], + }, + ); + + //events got processed + assert!(!gs.events.is_empty()); + } + + #[test] + fn test_ignore_px_from_peers_below_accept_px_threshold() { + let config = GossipsubConfig::default(); + let peer_score_params = PeerScoreParams::default(); + let mut peer_score_thresholds = PeerScoreThresholds::default(); + peer_score_thresholds.accept_px_threshold = peer_score_params.app_specific_weight; + + //build mesh with two peer + let (mut gs, peers, topics) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + 2, + vec!["test".into()], + true, + config.clone(), + 0, + 0, + Some((peer_score_params, peer_score_thresholds)), + ); + + //increase score of first peer to less than accept_px_threshold + gs.set_application_score(&peers[0], 0.99); + + //increase score of second peer to accept_px_threshold + gs.set_application_score(&peers[1], 1.0); + + //handle prune from peer peers[0] with px peers + let px = vec![PeerInfo { + peer: Some(PeerId::random()), + }]; + gs.handle_prune( + &peers[0], + vec![( + topics[0].clone(), + px.clone(), + Some(config.prune_backoff().as_secs()), + )], + ); + + //assert no dials + assert_eq!( + gs.events + .iter() + .filter(|e| match e { + NetworkBehaviourAction::DialPeer { .. } => true, + _ => false, + }) + .count(), + 0 + ); + + //handle prune from peer peers[1] with px peers + let px = vec![PeerInfo { + peer: Some(PeerId::random()), + }]; + gs.handle_prune( + &peers[1], + vec![( + topics[0].clone(), + px.clone(), + Some(config.prune_backoff().as_secs()), + )], + ); + + //assert there are dials now + assert!( + gs.events + .iter() + .filter(|e| match e { + NetworkBehaviourAction::DialPeer { .. } => true, + _ => false, + }) + .count() + > 0 + ); + } + + //TODO test oppertunisticGraftThreshold + + #[test] + fn test_keep_best_scoring_peers_on_oversubscription() { + let config = GossipsubConfigBuilder::new() + .mesh_n_low(15) + .mesh_n(30) + .mesh_n_high(60) + .retain_scores(29) + .build() + .unwrap(); + + //build mesh with more peers than mesh can hold + let n = config.mesh_n_high() + 1; + let (mut gs, peers, topics) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + n, + vec!["test".into()], + true, + config.clone(), + 0, + n, + Some((PeerScoreParams::default(), PeerScoreThresholds::default())), + ); + + // graft all, will be accepted since the are outbound + for peer in &peers { + gs.handle_graft(peer, topics.clone()); + } + + //assign scores to peers equalling their index + + //set random positive scores + for (index, peer) in peers.iter().enumerate() { + gs.set_application_score(peer, index as f64); + } + + assert_eq!(gs.mesh[&topics[0]].len(), n); + + dbg!(&peers); + dbg!(&gs.mesh[&topics[0]]); + + //heartbeat to prune some peers + gs.heartbeat(); + + dbg!(&gs.mesh[&topics[0]]); + + assert_eq!(gs.mesh[&topics[0]].len(), config.mesh_n()); + + //mesh contains retain_scores best peers + assert!(gs.mesh[&topics[0]].is_superset( + &peers[(n - config.retain_scores())..] + .iter() + .cloned() + .collect() + )); + } + + #[test] + fn test_scoring_p1() { + let config = GossipsubConfig::default(); + let mut peer_score_params = PeerScoreParams::default(); + let topic = Topic::new("test"); + let topic_hash = topic.hash(); + let mut topic_params = TopicScoreParams::default(); + topic_params.time_in_mesh_weight = 2.0; + topic_params.time_in_mesh_quantum = Duration::from_millis(50); + topic_params.time_in_mesh_cap = 10.0; + topic_params.topic_weight = 0.7; + peer_score_params + .topics + .insert(topic_hash.clone(), topic_params.clone()); + let peer_score_thresholds = PeerScoreThresholds::default(); + + //build mesh with one peer + let (mut gs, peers, _) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + 1, + vec!["test".into()], + true, + config.clone(), + 0, + 0, + Some((peer_score_params, peer_score_thresholds)), + ); + + //sleep for 2 times the mesh_quantum + sleep(topic_params.time_in_mesh_quantum * 2); + //refresh scores + gs.peer_score.as_mut().unwrap().0.refresh_scores(); + assert!( + gs.peer_score.as_ref().unwrap().0.score(&peers[0]) + >= 2.0 * topic_params.time_in_mesh_weight * topic_params.topic_weight, + "score should be at least 2 * time_in_mesh_weight * topic_weight" + ); + assert!( + gs.peer_score.as_ref().unwrap().0.score(&peers[0]) + < 3.0 * topic_params.time_in_mesh_weight * topic_params.topic_weight, + "score should be less than 3 * time_in_mesh_weight * topic_weight" + ); + + //sleep again for 2 times the mesh_quantum + sleep(topic_params.time_in_mesh_quantum * 2); + //refresh scores + gs.peer_score.as_mut().unwrap().0.refresh_scores(); + assert!( + gs.peer_score.as_ref().unwrap().0.score(&peers[0]) + >= 2.0 * topic_params.time_in_mesh_weight * topic_params.topic_weight, + "score should be at least 4 * time_in_mesh_weight * topic_weight" + ); + + //sleep for enough periods to reach maximum + sleep(topic_params.time_in_mesh_quantum * (topic_params.time_in_mesh_cap - 3.0) as u32); + //refresh scores + gs.peer_score.as_mut().unwrap().0.refresh_scores(); + assert_eq!( + gs.peer_score.as_ref().unwrap().0.score(&peers[0]), + topic_params.time_in_mesh_cap + * topic_params.time_in_mesh_weight + * topic_params.topic_weight, + "score should be exactly time_in_mesh_cap * time_in_mesh_weight * topic_weight" + ); + } + + fn random_message(seq: &mut u64, topics: &Vec) -> GossipsubMessage { + let mut rng = rand::thread_rng(); + *seq += 1; + GossipsubMessage { + source: Some(PeerId::random()), + data: (0..rng.gen_range(10, 30)) + .into_iter() + .map(|_| rng.gen()) + .collect(), + sequence_number: Some(*seq), + topics: topics.clone(), + signature: None, + key: None, + validated: true, + } + } + + #[test] + fn test_scoring_p2() { + let config = GossipsubConfig::default(); + let mut peer_score_params = PeerScoreParams::default(); + let topic = Topic::new("test"); + let topic_hash = topic.hash(); + let mut topic_params = TopicScoreParams::default(); + topic_params.time_in_mesh_weight = 0.0; //deactivate time in mesh + topic_params.first_message_deliveries_weight = 2.0; + topic_params.first_message_deliveries_cap = 10.0; + topic_params.first_message_deliveries_decay = 0.9; + topic_params.topic_weight = 0.7; + peer_score_params + .topics + .insert(topic_hash.clone(), topic_params.clone()); + let peer_score_thresholds = PeerScoreThresholds::default(); + + //build mesh with one peer + let (mut gs, peers, topics) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + 2, + vec!["test".into()], + true, + config.clone(), + 0, + 0, + Some((peer_score_params, peer_score_thresholds)), + ); + + let mut seq = 0; + let deliver_message = |gs: &mut Gossipsub, index: usize, msg: GossipsubMessage| { + gs.handle_received_message(msg, &peers[index]); + }; + + let m1 = random_message(&mut seq, &topics); + //peer 0 delivers message first + deliver_message(&mut gs, 0, m1.clone()); + //peer 1 delivers message second + deliver_message(&mut gs, 1, m1.clone()); + + assert_eq!( + gs.peer_score.as_ref().unwrap().0.score(&peers[0]), + 1.0 * topic_params.first_message_deliveries_weight * topic_params.topic_weight, + "score should be exactly first_message_deliveries_weight * topic_weight" + ); + + assert_eq!( + gs.peer_score.as_ref().unwrap().0.score(&peers[1]), + 0.0, + "there should be no score for second message deliveries * topic_weight" + ); + + //peer 2 delivers two new messages + deliver_message(&mut gs, 1, random_message(&mut seq, &topics)); + deliver_message(&mut gs, 1, random_message(&mut seq, &topics)); + assert_eq!( + gs.peer_score.as_ref().unwrap().0.score(&peers[1]), + 2.0 * topic_params.first_message_deliveries_weight * topic_params.topic_weight, + "score should be exactly 2 * first_message_deliveries_weight * topic_weight" + ); + + //test decaying + gs.peer_score.as_mut().unwrap().0.refresh_scores(); + + assert_eq!( + gs.peer_score.as_ref().unwrap().0.score(&peers[0]), + 1.0 * topic_params.first_message_deliveries_decay + * topic_params.first_message_deliveries_weight + * topic_params.topic_weight, + "score should be exactly first_message_deliveries_decay * \ + first_message_deliveries_weight * topic_weight" + ); + + assert_eq!( + gs.peer_score.as_ref().unwrap().0.score(&peers[1]), + 2.0 * topic_params.first_message_deliveries_decay + * topic_params.first_message_deliveries_weight + * topic_params.topic_weight, + "score should be exactly 2 * first_message_deliveries_decay * \ + first_message_deliveries_weight * topic_weight" + ); + + //test cap + for _ in 0..topic_params.first_message_deliveries_cap as u64 { + deliver_message(&mut gs, 1, random_message(&mut seq, &topics)); + } + + assert_eq!( + gs.peer_score.as_ref().unwrap().0.score(&peers[1]), + topic_params.first_message_deliveries_cap + * topic_params.first_message_deliveries_weight + * topic_params.topic_weight, + "score should be exactly first_message_deliveries_cap * \ + first_message_deliveries_weight * topic_weight" + ); + } + + #[test] + fn test_scoring_p3() { + let config = GossipsubConfig::default(); + let mut peer_score_params = PeerScoreParams::default(); + let topic = Topic::new("test"); + let topic_hash = topic.hash(); + let mut topic_params = TopicScoreParams::default(); + topic_params.time_in_mesh_weight = 0.0; //deactivate time in mesh + topic_params.first_message_deliveries_weight = 0.0; //deactivate first time deliveries + topic_params.mesh_message_deliveries_weight = -2.0; + topic_params.mesh_message_deliveries_decay = 0.9; + topic_params.mesh_message_deliveries_cap = 10.0; + topic_params.mesh_message_deliveries_threshold = 5.0; + topic_params.mesh_message_deliveries_activation = Duration::from_secs(1); + topic_params.mesh_message_deliveries_window = Duration::from_millis(100); + topic_params.topic_weight = 0.7; + peer_score_params + .topics + .insert(topic_hash.clone(), topic_params.clone()); + let peer_score_thresholds = PeerScoreThresholds::default(); + + //build mesh with two peers + let (mut gs, peers, topics) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + 2, + vec!["test".into()], + true, + config.clone(), + 0, + 0, + Some((peer_score_params, peer_score_thresholds)), + ); + + let mut seq = 0; + let deliver_message = |gs: &mut Gossipsub, index: usize, msg: GossipsubMessage| { + gs.handle_received_message(msg, &peers[index]); + }; + + let mut expected_message_deliveries = 0.0; + + //messages used to test window + let m1 = random_message(&mut seq, &topics); + let m2 = random_message(&mut seq, &topics); + + //peer 1 delivers m1 + deliver_message(&mut gs, 1, m1.clone()); + + //peer 0 delivers two message + deliver_message(&mut gs, 0, random_message(&mut seq, &topics)); + deliver_message(&mut gs, 0, random_message(&mut seq, &topics)); + expected_message_deliveries += 2.0; + + sleep(Duration::from_millis(60)); + + //peer 1 delivers m2 + deliver_message(&mut gs, 1, m2.clone()); + + sleep(Duration::from_millis(70)); + //peer 0 delivers m1 and m2 only m2 gets counted + deliver_message(&mut gs, 0, m1); + deliver_message(&mut gs, 0, m2); + expected_message_deliveries += 1.0; + + sleep(Duration::from_millis(900)); + + //message deliveries penalties get activated, peer 0 has only delivered 3 messages and + // therefore gets a penalty + gs.peer_score.as_mut().unwrap().0.refresh_scores(); + expected_message_deliveries *= 0.9; //decay + + assert_eq!( + gs.peer_score.as_ref().unwrap().0.score(&peers[0]), + (5f64 - expected_message_deliveries).powi(2) * -2.0 * 0.7 + ); + + // peer 0 delivers a lot of messages => message_deliveries should be capped at 10 + for _ in 0..20 { + deliver_message(&mut gs, 0, random_message(&mut seq, &topics)); + } + + expected_message_deliveries = 10.0; + + assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0); + + //apply 10 decays + for _ in 0..10 { + gs.peer_score.as_mut().unwrap().0.refresh_scores(); + expected_message_deliveries *= 0.9; //decay + } + + assert_eq!( + gs.peer_score.as_ref().unwrap().0.score(&peers[0]), + (5f64 - expected_message_deliveries).powi(2) * -2.0 * 0.7 + ); + } + + #[test] + fn test_scoring_p3b() { + let config = GossipsubConfigBuilder::new() + .prune_backoff(Duration::from_millis(100)) + .build() + .unwrap(); + let mut peer_score_params = PeerScoreParams::default(); + let topic = Topic::new("test"); + let topic_hash = topic.hash(); + let mut topic_params = TopicScoreParams::default(); + topic_params.time_in_mesh_weight = 0.0; //deactivate time in mesh + topic_params.first_message_deliveries_weight = 0.0; //deactivate first time deliveries + topic_params.mesh_message_deliveries_weight = -2.0; + topic_params.mesh_message_deliveries_decay = 0.9; + topic_params.mesh_message_deliveries_cap = 10.0; + topic_params.mesh_message_deliveries_threshold = 5.0; + topic_params.mesh_message_deliveries_activation = Duration::from_secs(1); + topic_params.mesh_message_deliveries_window = Duration::from_millis(100); + topic_params.mesh_failure_penalty_weight = -3.0; + topic_params.mesh_failure_penalty_decay = 0.95; + topic_params.topic_weight = 0.7; + peer_score_params + .topics + .insert(topic_hash.clone(), topic_params.clone()); + peer_score_params.app_specific_weight = 1.0; + let peer_score_thresholds = PeerScoreThresholds::default(); + + //build mesh with one peer + let (mut gs, peers, topics) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + 1, + vec!["test".into()], + true, + config.clone(), + 0, + 0, + Some((peer_score_params, peer_score_thresholds)), + ); + + let mut seq = 0; + let deliver_message = |gs: &mut Gossipsub, index: usize, msg: GossipsubMessage| { + gs.handle_received_message(msg, &peers[index]); + }; + + let mut expected_message_deliveries = 0.0; + + //add some positive score + gs.peer_score + .as_mut() + .unwrap() + .0 + .set_application_score(&peers[0], 100.0); + + //peer 0 delivers two message + deliver_message(&mut gs, 0, random_message(&mut seq, &topics)); + deliver_message(&mut gs, 0, random_message(&mut seq, &topics)); + expected_message_deliveries += 2.0; + + sleep(Duration::from_millis(1050)); + + //activation kicks in + gs.peer_score.as_mut().unwrap().0.refresh_scores(); + expected_message_deliveries *= 0.9; //decay + + //prune peer + gs.handle_prune(&peers[0], vec![(topics[0].clone(), vec![], None)]); + + //wait backoff + sleep(Duration::from_millis(130)); + + //regraft peer + gs.handle_graft(&peers[0], topics.clone()); + + //the score should now consider p3b + let mut expected_b3 = (5f64 - expected_message_deliveries).powi(2); + assert_eq!( + gs.peer_score.as_ref().unwrap().0.score(&peers[0]), + 100.0 + expected_b3 * -3.0 * 0.7 + ); + + //we can also add a new p3 to the score + + //peer 0 delivers one message + deliver_message(&mut gs, 0, random_message(&mut seq, &topics)); + expected_message_deliveries += 1.0; + + sleep(Duration::from_millis(1050)); + gs.peer_score.as_mut().unwrap().0.refresh_scores(); + expected_message_deliveries *= 0.9; //decay + expected_b3 *= 0.95; + + assert_eq!( + gs.peer_score.as_ref().unwrap().0.score(&peers[0]), + 100.0 + + (expected_b3 * -3.0 + (5f64 - expected_message_deliveries).powi(2) * -2.0) * 0.7 + ); + } + + #[test] + fn test_scoring_p4_valid_message() { + let config = GossipsubConfigBuilder::new() + .validate_messages() + .build() + .unwrap(); + let mut peer_score_params = PeerScoreParams::default(); + let topic = Topic::new("test"); + let topic_hash = topic.hash(); + let mut topic_params = TopicScoreParams::default(); + topic_params.time_in_mesh_weight = 0.0; //deactivate time in mesh + topic_params.first_message_deliveries_weight = 0.0; //deactivate first time deliveries + topic_params.mesh_message_deliveries_weight = 0.0; //deactivate message deliveries + topic_params.mesh_failure_penalty_weight = 0.0; //deactivate mesh failure penalties + topic_params.invalid_message_deliveries_weight = -2.0; + topic_params.invalid_message_deliveries_decay = 0.9; + topic_params.topic_weight = 0.7; + peer_score_params + .topics + .insert(topic_hash.clone(), topic_params.clone()); + peer_score_params.app_specific_weight = 1.0; + let peer_score_thresholds = PeerScoreThresholds::default(); + + //build mesh with two peers + let (mut gs, peers, topics) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + 1, + vec!["test".into()], + true, + config.clone(), + 0, + 0, + Some((peer_score_params, peer_score_thresholds)), + ); + + let mut seq = 0; + let deliver_message = |gs: &mut Gossipsub, index: usize, msg: GossipsubMessage| { + gs.handle_received_message(msg, &peers[index]); + }; + + let id = config.message_id_fn(); + //peer 0 delivers valid message + let m1 = random_message(&mut seq, &topics); + deliver_message(&mut gs, 0, m1.clone()); + + assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0); + + //message m1 gets validated + gs.validate_message(&id(&m1), &peers[0], MessageAcceptance::Accept); + + assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0); + } + + //TODO test p4 with invalid/missing signature messages + + #[test] + fn test_scoring_p4_message_from_self() { + let config = GossipsubConfigBuilder::new() + .validate_messages() + .build() + .unwrap(); + let mut peer_score_params = PeerScoreParams::default(); + let topic = Topic::new("test"); + let topic_hash = topic.hash(); + let mut topic_params = TopicScoreParams::default(); + topic_params.time_in_mesh_weight = 0.0; //deactivate time in mesh + topic_params.first_message_deliveries_weight = 0.0; //deactivate first time deliveries + topic_params.mesh_message_deliveries_weight = 0.0; //deactivate message deliveries + topic_params.mesh_failure_penalty_weight = 0.0; //deactivate mesh failure penalties + topic_params.invalid_message_deliveries_weight = -2.0; + topic_params.invalid_message_deliveries_decay = 0.9; + topic_params.topic_weight = 0.7; + peer_score_params + .topics + .insert(topic_hash.clone(), topic_params.clone()); + peer_score_params.app_specific_weight = 1.0; + let peer_score_thresholds = PeerScoreThresholds::default(); + + //build mesh with two peers + let (mut gs, peers, topics) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + 1, + vec!["test".into()], + true, + config.clone(), + 0, + 0, + Some((peer_score_params, peer_score_thresholds)), + ); + + let mut seq = 0; + let deliver_message = |gs: &mut Gossipsub, index: usize, msg: GossipsubMessage| { + gs.handle_received_message(msg, &peers[index]); + }; + + //peer 0 delivers invalid message from self + let mut m = random_message(&mut seq, &topics); + m.source = Some(gs.publish_info.get_own_id().unwrap().clone()); + + deliver_message(&mut gs, 0, m.clone()); + assert_eq!( + gs.peer_score.as_ref().unwrap().0.score(&peers[0]), + -2.0 * 0.7 + ); + } + + #[test] + fn test_scoring_p4_ignored_message() { + let config = GossipsubConfigBuilder::new() + .validate_messages() + .build() + .unwrap(); + let mut peer_score_params = PeerScoreParams::default(); + let topic = Topic::new("test"); + let topic_hash = topic.hash(); + let mut topic_params = TopicScoreParams::default(); + topic_params.time_in_mesh_weight = 0.0; //deactivate time in mesh + topic_params.first_message_deliveries_weight = 0.0; //deactivate first time deliveries + topic_params.mesh_message_deliveries_weight = 0.0; //deactivate message deliveries + topic_params.mesh_failure_penalty_weight = 0.0; //deactivate mesh failure penalties + topic_params.invalid_message_deliveries_weight = -2.0; + topic_params.invalid_message_deliveries_decay = 0.9; + topic_params.topic_weight = 0.7; + peer_score_params + .topics + .insert(topic_hash.clone(), topic_params.clone()); + peer_score_params.app_specific_weight = 1.0; + let peer_score_thresholds = PeerScoreThresholds::default(); + + //build mesh with two peers + let (mut gs, peers, topics) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + 1, + vec!["test".into()], + true, + config.clone(), + 0, + 0, + Some((peer_score_params, peer_score_thresholds)), + ); + + let mut seq = 0; + let deliver_message = |gs: &mut Gossipsub, index: usize, msg: GossipsubMessage| { + gs.handle_received_message(msg, &peers[index]); + }; + + //peer 0 delivers ignored message + let m1 = random_message(&mut seq, &topics); + deliver_message(&mut gs, 0, m1.clone()); + + assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0); + + //message m1 gets ignored + gs.validate_message( + &(config.message_id_fn())(&m1), + &peers[0], + MessageAcceptance::Ignore, + ); + + assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0); + } + + #[test] + fn test_scoring_p4_application_invalidated_message() { + let config = GossipsubConfigBuilder::new() + .validate_messages() + .build() + .unwrap(); + let mut peer_score_params = PeerScoreParams::default(); + let topic = Topic::new("test"); + let topic_hash = topic.hash(); + let mut topic_params = TopicScoreParams::default(); + topic_params.time_in_mesh_weight = 0.0; //deactivate time in mesh + topic_params.first_message_deliveries_weight = 0.0; //deactivate first time deliveries + topic_params.mesh_message_deliveries_weight = 0.0; //deactivate message deliveries + topic_params.mesh_failure_penalty_weight = 0.0; //deactivate mesh failure penalties + topic_params.invalid_message_deliveries_weight = -2.0; + topic_params.invalid_message_deliveries_decay = 0.9; + topic_params.topic_weight = 0.7; + peer_score_params + .topics + .insert(topic_hash.clone(), topic_params.clone()); + peer_score_params.app_specific_weight = 1.0; + let peer_score_thresholds = PeerScoreThresholds::default(); + + //build mesh with two peers + let (mut gs, peers, topics) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + 1, + vec!["test".into()], + true, + config.clone(), + 0, + 0, + Some((peer_score_params, peer_score_thresholds)), + ); + + let mut seq = 0; + let deliver_message = |gs: &mut Gossipsub, index: usize, msg: GossipsubMessage| { + gs.handle_received_message(msg, &peers[index]); + }; + + //peer 0 delivers invalid message + let m1 = random_message(&mut seq, &topics); + deliver_message(&mut gs, 0, m1.clone()); + + assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0); + + //message m1 gets rejected + gs.validate_message( + &(config.message_id_fn())(&m1), + &peers[0], + MessageAcceptance::Reject, + ); + + assert_eq!( + gs.peer_score.as_ref().unwrap().0.score(&peers[0]), + -2.0 * 0.7 + ); + } + + #[test] + fn test_scoring_p4_application_invalid_message_from_two_peers() { + let config = GossipsubConfigBuilder::new() + .validate_messages() + .build() + .unwrap(); + let mut peer_score_params = PeerScoreParams::default(); + let topic = Topic::new("test"); + let topic_hash = topic.hash(); + let mut topic_params = TopicScoreParams::default(); + topic_params.time_in_mesh_weight = 0.0; //deactivate time in mesh + topic_params.first_message_deliveries_weight = 0.0; //deactivate first time deliveries + topic_params.mesh_message_deliveries_weight = 0.0; //deactivate message deliveries + topic_params.mesh_failure_penalty_weight = 0.0; //deactivate mesh failure penalties + topic_params.invalid_message_deliveries_weight = -2.0; + topic_params.invalid_message_deliveries_decay = 0.9; + topic_params.topic_weight = 0.7; + peer_score_params + .topics + .insert(topic_hash.clone(), topic_params.clone()); + peer_score_params.app_specific_weight = 1.0; + let peer_score_thresholds = PeerScoreThresholds::default(); + + //build mesh with two peers + let (mut gs, peers, topics) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + 2, + vec!["test".into()], + true, + config.clone(), + 0, + 0, + Some((peer_score_params, peer_score_thresholds)), + ); + + let mut seq = 0; + let deliver_message = |gs: &mut Gossipsub, index: usize, msg: GossipsubMessage| { + gs.handle_received_message(msg, &peers[index]); + }; + + //peer 0 delivers invalid message + let m1 = random_message(&mut seq, &topics); + deliver_message(&mut gs, 0, m1.clone()); + + //peer 1 delivers same message + deliver_message(&mut gs, 1, m1.clone()); + + assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0); + assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[1]), 0.0); + + //message m1 gets rejected + gs.validate_message( + &(config.message_id_fn())(&m1), + &peers[0], + MessageAcceptance::Reject, + ); + + assert_eq!( + gs.peer_score.as_ref().unwrap().0.score(&peers[0]), + -2.0 * 0.7 + ); + assert_eq!( + gs.peer_score.as_ref().unwrap().0.score(&peers[1]), + -2.0 * 0.7 + ); + } + + #[test] + fn test_scoring_p4_three_application_invalid_messages() { + let config = GossipsubConfigBuilder::new() + .validate_messages() + .build() + .unwrap(); + let mut peer_score_params = PeerScoreParams::default(); + let topic = Topic::new("test"); + let topic_hash = topic.hash(); + let mut topic_params = TopicScoreParams::default(); + topic_params.time_in_mesh_weight = 0.0; //deactivate time in mesh + topic_params.first_message_deliveries_weight = 0.0; //deactivate first time deliveries + topic_params.mesh_message_deliveries_weight = 0.0; //deactivate message deliveries + topic_params.mesh_failure_penalty_weight = 0.0; //deactivate mesh failure penalties + topic_params.invalid_message_deliveries_weight = -2.0; + topic_params.invalid_message_deliveries_decay = 0.9; + topic_params.topic_weight = 0.7; + peer_score_params + .topics + .insert(topic_hash.clone(), topic_params.clone()); + peer_score_params.app_specific_weight = 1.0; + let peer_score_thresholds = PeerScoreThresholds::default(); + + //build mesh with one peer + let (mut gs, peers, topics) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + 1, + vec!["test".into()], + true, + config.clone(), + 0, + 0, + Some((peer_score_params, peer_score_thresholds)), + ); + + let mut seq = 0; + let deliver_message = |gs: &mut Gossipsub, index: usize, msg: GossipsubMessage| { + gs.handle_received_message(msg, &peers[index]); + }; + + //peer 0 delivers two invalid message + let m1 = random_message(&mut seq, &topics); + let m2 = random_message(&mut seq, &topics); + let m3 = random_message(&mut seq, &topics); + deliver_message(&mut gs, 0, m1.clone()); + deliver_message(&mut gs, 0, m2.clone()); + deliver_message(&mut gs, 0, m3.clone()); + + assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0); + + //messages gets rejected + gs.validate_message( + &(config.message_id_fn())(&m1), + &peers[0], + MessageAcceptance::Reject, + ); + gs.validate_message( + &(config.message_id_fn())(&m2), + &peers[0], + MessageAcceptance::Reject, + ); + gs.validate_message( + &(config.message_id_fn())(&m3), + &peers[0], + MessageAcceptance::Reject, + ); + + //number of invalid messages gets squared + assert_eq!( + gs.peer_score.as_ref().unwrap().0.score(&peers[0]), + 9.0 * -2.0 * 0.7 + ); + } + + #[test] + fn test_scoring_p4_decay() { + let config = GossipsubConfigBuilder::new() + .validate_messages() + .build() + .unwrap(); + let mut peer_score_params = PeerScoreParams::default(); + let topic = Topic::new("test"); + let topic_hash = topic.hash(); + let mut topic_params = TopicScoreParams::default(); + topic_params.time_in_mesh_weight = 0.0; //deactivate time in mesh + topic_params.first_message_deliveries_weight = 0.0; //deactivate first time deliveries + topic_params.mesh_message_deliveries_weight = 0.0; //deactivate message deliveries + topic_params.mesh_failure_penalty_weight = 0.0; //deactivate mesh failure penalties + topic_params.invalid_message_deliveries_weight = -2.0; + topic_params.invalid_message_deliveries_decay = 0.9; + topic_params.topic_weight = 0.7; + peer_score_params + .topics + .insert(topic_hash.clone(), topic_params.clone()); + peer_score_params.app_specific_weight = 1.0; + let peer_score_thresholds = PeerScoreThresholds::default(); + + //build mesh with one peer + let (mut gs, peers, topics) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + 1, + vec!["test".into()], + true, + config.clone(), + 0, + 0, + Some((peer_score_params, peer_score_thresholds)), + ); + + let mut seq = 0; + let deliver_message = |gs: &mut Gossipsub, index: usize, msg: GossipsubMessage| { + gs.handle_received_message(msg, &peers[index]); + }; + + //peer 0 delivers invalid message + let m1 = random_message(&mut seq, &topics); + deliver_message(&mut gs, 0, m1.clone()); + + assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0); + + //message m1 gets rejected + gs.validate_message( + &(config.message_id_fn())(&m1), + &peers[0], + MessageAcceptance::Reject, + ); + + assert_eq!( + gs.peer_score.as_ref().unwrap().0.score(&peers[0]), + -2.0 * 0.7 + ); + + //we decay + gs.peer_score.as_mut().unwrap().0.refresh_scores(); + + // the number of invalids gets decayed to 0.9 and then squared in the score + assert_eq!( + gs.peer_score.as_ref().unwrap().0.score(&peers[0]), + 0.9 * 0.9 * -2.0 * 0.7 + ); + } + + #[test] + fn test_scoring_p5() { + let mut peer_score_params = PeerScoreParams::default(); + peer_score_params.app_specific_weight = 2.0; + + //build mesh with one peer + let (mut gs, peers, _) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + 1, + vec!["test".into()], + true, + GossipsubConfig::default(), + 0, + 0, + Some((peer_score_params, PeerScoreThresholds::default())), + ); + + gs.set_application_score(&peers[0], 1.1); + + assert_eq!( + gs.peer_score.as_ref().unwrap().0.score(&peers[0]), + 1.1 * 2.0 + ); + } + + #[test] + fn test_scoring_p6() { + let mut peer_score_params = PeerScoreParams::default(); + peer_score_params.ip_colocation_factor_threshold = 5.0; + peer_score_params.ip_colocation_factor_weight = -2.0; + + let (mut gs, _, _) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + 0, + vec![], + false, + GossipsubConfig::default(), + 0, + 0, + Some((peer_score_params, PeerScoreThresholds::default())), + ); + + //create 5 peers with the same ip + let addr = Multiaddr::from(Ipv4Addr::new(10, 1, 2, 3)); + let peers = vec![ + add_peer_with_addr(&mut gs, &vec![], false, false, addr.clone()), + add_peer_with_addr(&mut gs, &vec![], false, false, addr.clone()), + add_peer_with_addr(&mut gs, &vec![], true, false, addr.clone()), + add_peer_with_addr(&mut gs, &vec![], true, false, addr.clone()), + add_peer_with_addr(&mut gs, &vec![], true, true, addr.clone()), + ]; + + //create 4 other peers with other ip + let addr2 = Multiaddr::from(Ipv4Addr::new(10, 1, 2, 4)); + let others = vec![ + add_peer_with_addr(&mut gs, &vec![], false, false, addr2.clone()), + add_peer_with_addr(&mut gs, &vec![], false, false, addr2.clone()), + add_peer_with_addr(&mut gs, &vec![], true, false, addr2.clone()), + add_peer_with_addr(&mut gs, &vec![], true, false, addr2.clone()), + ]; + + //no penalties yet + for peer in peers.iter().chain(others.iter()) { + assert_eq!(gs.peer_score.as_ref().unwrap().0.score(peer), 0.0); + } + + //add additional connection for 3 others with addr + for i in 0..3 { + gs.inject_connection_established( + &others[i], + &ConnectionId::new(0), + &ConnectedPoint::Dialer { + address: addr.clone(), + }, + ); + } + + //penalties apply squared + for peer in peers.iter().chain(others.iter().take(3)) { + assert_eq!(gs.peer_score.as_ref().unwrap().0.score(peer), 9.0 * -2.0); + } + //fourth other peer still no penalty + assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&others[3]), 0.0); + + //add additional connection for 3 of the peers to addr2 + for i in 0..3 { + gs.inject_connection_established( + &peers[i], + &ConnectionId::new(0), + &ConnectedPoint::Dialer { + address: addr2.clone(), + }, + ); + } + + //double penalties for the first three of each + for peer in peers.iter().take(3).chain(others.iter().take(3)) { + assert_eq!( + gs.peer_score.as_ref().unwrap().0.score(peer), + (9.0 + 4.0) * -2.0 + ); + } + + //single penalties for the rest + for peer in peers.iter().skip(3) { + assert_eq!(gs.peer_score.as_ref().unwrap().0.score(peer), 9.0 * -2.0); + } + assert_eq!( + gs.peer_score.as_ref().unwrap().0.score(&others[3]), + 4.0 * -2.0 + ); + + //two times same ip doesn't count twice + gs.inject_connection_established( + &peers[0], + &ConnectionId::new(0), + &ConnectedPoint::Dialer { + address: addr.clone(), + }, + ); + + //nothing changed + //double penalties for the first three of each + for peer in peers.iter().take(3).chain(others.iter().take(3)) { + assert_eq!( + gs.peer_score.as_ref().unwrap().0.score(peer), + (9.0 + 4.0) * -2.0 + ); + } + + //single penalties for the rest + for peer in peers.iter().skip(3) { + assert_eq!(gs.peer_score.as_ref().unwrap().0.score(peer), 9.0 * -2.0); + } + assert_eq!( + gs.peer_score.as_ref().unwrap().0.score(&others[3]), + 4.0 * -2.0 + ); + } + + #[test] + fn test_scoring_p7_grafts_before_backoff() { + let config = GossipsubConfigBuilder::new() + .prune_backoff(Duration::from_millis(200)) + .graft_flood_threshold(Duration::from_millis(100)) + .build() + .unwrap(); + let mut peer_score_params = PeerScoreParams::default(); + peer_score_params.behaviour_penalty_weight = -2.0; + peer_score_params.behaviour_penalty_decay = 0.9; + + let (mut gs, peers, topics) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + 2, + vec!["test".into()], + false, + config, + 0, + 0, + Some((peer_score_params, PeerScoreThresholds::default())), + ); + + //remove peers from mesh and send prune to them => this adds a backoff for the peers + for i in 0..2 { + gs.mesh.get_mut(&topics[0]).unwrap().remove(&peers[i]); + gs.send_graft_prune( + HashMap::new(), + vec![(peers[i].clone(), vec![topics[0].clone()])] + .into_iter() + .collect(), + HashSet::new(), + ); + } + + //wait 50 millisecs + sleep(Duration::from_millis(50)); + + //first peer tries to graft + gs.handle_graft(&peers[0], vec![topics[0].clone()]); + + //double behaviour penalty for first peer (squared) + assert_eq!( + gs.peer_score.as_ref().unwrap().0.score(&peers[0]), + 4.0 * -2.0 + ); + + //wait 100 millisecs + sleep(Duration::from_millis(100)); + + //second peer tries to graft + gs.handle_graft(&peers[1], vec![topics[0].clone()]); + + //single behaviour penalty for second peer + assert_eq!( + gs.peer_score.as_ref().unwrap().0.score(&peers[1]), + 1.0 * -2.0 + ); + + //test decay + gs.peer_score.as_mut().unwrap().0.refresh_scores(); + + assert_eq!( + gs.peer_score.as_ref().unwrap().0.score(&peers[0]), + 4.0 * 0.9 * 0.9 * -2.0 + ); + assert_eq!( + gs.peer_score.as_ref().unwrap().0.score(&peers[1]), + 1.0 * 0.9 * 0.9 * -2.0 + ); + } } diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index 9d1428e97f6..ea05d030522 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -74,6 +74,11 @@ pub struct GossipsubConfig { /// is 12). mesh_n_high: usize, + /// Affects how peers are selected when pruning a mesh due to over subscription. + // At least `retain_scores` of the retained peers will be high-scoring, while the remainder are + // chosen randomly (D_score in the spec, default is 4). + retain_scores: usize, + /// Minimum number of peers to emit gossip to during a heartbeat (D_lazy in the spec, /// default is 6). gossip_lazy: usize, @@ -159,6 +164,11 @@ pub struct GossipsubConfig { /// The default is true. flood_publish: bool, + // If a GRAFT comes before `graft_flood_threshold` has elapsed since the last PRUNE, + // then there is an extra score penalty applied to the peer through P7. The default is 10 + // seconds. + graft_flood_threshold: Duration, + /// Minimum number of outbound peers in the mesh network before adding more (D_out in the spec). /// This value must be smaller or equal than `mesh_n / 2` and smaller than `mesh_n_low`. /// The default is 2. @@ -205,6 +215,13 @@ impl GossipsubConfig { self.mesh_n_high } + /// Affects how peers are selected when pruning a mesh due to over subscription. + // At least `retain_scores` of the retained peers will be high-scoring, while the remainder are + // chosen randomly (D_score in the spec, default is 4). + pub fn retain_scores(&self) -> usize { + self.retain_scores + } + /// Minimum number of peers to emit gossip to during a heartbeat (D_lazy in the spec, /// default is 6). pub fn gossip_lazy(&self) -> usize { @@ -322,6 +339,12 @@ impl GossipsubConfig { self.flood_publish } + // If a GRAFT comes before `graft_flood_threshold` has elapsed since the last PRUNE, + // then there is an extra score penalty applied to the peer through P7. + pub fn graft_flood_threshold(&self) -> Duration { + self.graft_flood_threshold + } + /// Minimum number of outbound peers in the mesh network before adding more (D_out in the spec). /// This value must be smaller or equal than `mesh_n / 2` and smaller than `mesh_n_low`. /// The default is 2. @@ -363,6 +386,7 @@ impl GossipsubConfigBuilder { mesh_n: 6, mesh_n_low: 5, mesh_n_high: 12, + retain_scores: 4, gossip_lazy: 6, // default to mesh_n gossip_factor: 0.25, heartbeat_initial_delay: Duration::from_secs(5), @@ -392,6 +416,7 @@ impl GossipsubConfigBuilder { prune_backoff: Duration::from_secs(60), backoff_slack: 1, flood_publish: true, + graft_flood_threshold: Duration::from_secs(10), mesh_outbound_min: 2, }, } @@ -434,6 +459,14 @@ impl GossipsubConfigBuilder { self } + /// Affects how peers are selected when pruning a mesh due to over subscription. + // At least `retain_scores` of the retained peers will be high-scoring, while the remainder are + // chosen randomly (D_score in the spec, default is 4). + pub fn retain_scores(&mut self, retain_scores: usize) -> &mut Self { + self.config.retain_scores = retain_scores; + self + } + /// Minimum number of peers to emit gossip to during a heartbeat (D_lazy in the spec, /// default is 6). pub fn gossip_lazy(&mut self, gossip_lazy: usize) -> &mut Self { @@ -565,6 +598,13 @@ impl GossipsubConfigBuilder { self } + // If a GRAFT comes before `graft_flood_threshold` has elapsed since the last PRUNE, + // then there is an extra score penalty applied to the peer through P7. + pub fn graft_flood_threshold(&mut self, graft_flood_threshold: Duration) -> &mut Self { + self.config.graft_flood_threshold = graft_flood_threshold; + self + } + /// Minimum number of outbound peers in the mesh network before adding more (D_out in the spec). /// This value must be smaller or equal than `mesh_n / 2` and smaller than `mesh_n_low`. /// The default is 2. @@ -608,6 +648,7 @@ impl std::fmt::Debug for GossipsubConfig { let _ = builder.field("mesh_n", &self.mesh_n); let _ = builder.field("mesh_n_low", &self.mesh_n_low); let _ = builder.field("mesh_n_high", &self.mesh_n_high); + let _ = builder.field("retain_scores", &self.retain_scores); let _ = builder.field("gossip_lazy", &self.gossip_lazy); let _ = builder.field("gossip_factor", &self.gossip_factor); let _ = builder.field("heartbeat_initial_delay", &self.heartbeat_initial_delay); @@ -622,6 +663,7 @@ impl std::fmt::Debug for GossipsubConfig { let _ = builder.field("prune_backoff", &self.prune_backoff); let _ = builder.field("backoff_slack", &self.backoff_slack); let _ = builder.field("flood_publish", &self.flood_publish); + let _ = builder.field("graft_flood_threshold", &self.graft_flood_threshold); let _ = builder.field("mesh_outbound_min", &self.mesh_outbound_min); builder.finish() } diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index 0a0a92ace07..e4c69c40fcf 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -151,6 +151,10 @@ mod rpc_proto { pub use self::behaviour::{Gossipsub, GossipsubEvent, GossipsubRpc, MessageAuthenticity}; pub use self::config::{GossipsubConfig, GossipsubConfigBuilder, ValidationMode}; +pub use self::peer_score::{ + score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds, + TopicScoreParams, +}; pub use self::protocol::{GossipsubMessage, MessageId}; pub use self::topic::{Hasher, Topic, TopicHash}; pub type IdentTopic = Topic; diff --git a/protocols/gossipsub/src/mcache.rs b/protocols/gossipsub/src/mcache.rs index 54f6335fd99..db5c6d7645f 100644 --- a/protocols/gossipsub/src/mcache.rs +++ b/protocols/gossipsub/src/mcache.rs @@ -21,6 +21,7 @@ use crate::protocol::{GossipsubMessage, MessageId}; use crate::topic::TopicHash; use std::collections::HashMap; +use log::{warn}; /// CacheEntry stored in the history. #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -117,12 +118,27 @@ impl MessageCache { /// last entry pub fn shift(&mut self) { for entry in self.history.pop().expect("history is always > 1") { - self.msgs.remove(&entry.mid); + if let Some(msg) = self.msgs.remove(&entry.mid) { + if !msg.validated { + warn!("The message with id {} got removed from the cache without being + validated. If GossipsubConfig::validate_messages is true, the implementing + application has to ensure that Gossipsub::validate_message gets called for + each received message within the cache timeout time.", &entry.mid); + } + } } // Insert an empty vec in position 0 self.history.insert(0, Vec::new()); } + + /// Removes a message from the cache + pub fn remove(&mut self, message_id: &MessageId) -> Option { + //We only remove the message from msgs and keep the message_id in the history vector. + //The id in the history vector will simply be ignored on popping. + + self.msgs.remove(message_id) + } } #[cfg(test)] diff --git a/protocols/gossipsub/src/peer_score/mod.rs b/protocols/gossipsub/src/peer_score/mod.rs index 5d17e46d23e..8b32b87b830 100644 --- a/protocols/gossipsub/src/peer_score/mod.rs +++ b/protocols/gossipsub/src/peer_score/mod.rs @@ -1,15 +1,18 @@ //! Manages and stores the Scoring logic of a particular peer on the gossipsub behaviour. -use crate::{GossipsubMessage, Hasher, MessageId, Topic, TopicHash}; +use crate::{GossipsubMessage, MessageId, TopicHash}; use libp2p_core::PeerId; use log::warn; use lru_time_cache::LruCache; -use std::collections::{HashMap, HashSet}; +use std::collections::{hash_map, HashMap, HashSet}; use std::net::IpAddr; use std::time::{Duration, Instant}; mod params; -use params::*; +pub use params::{ + score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds, + TopicScoreParams, +}; #[cfg(test)] mod tests; @@ -17,7 +20,7 @@ mod tests; /// The number of seconds delivery messages are stored in the cache. const TIME_CACHE_DURATION: u64 = 120; -struct PeerScore { +pub(crate) struct PeerScore { params: PeerScoreParams, /// The score parameters. peer_stats: HashMap, @@ -36,9 +39,11 @@ struct PeerStats { /// Stats per topic. topics: HashMap, /// IP tracking for individual peers. - known_ips: Vec, + known_ips: HashSet, /// Behaviour penalty that is applied to the peer, assigned by the behaviour. behaviour_penalty: f64, + /// Application specific score. Can be manipulated by calling PeerScore::set_application_score + application_score: f64, } enum ConnectionStatus { @@ -56,8 +61,9 @@ impl Default for PeerStats { PeerStats { status: ConnectionStatus::Connected, topics: HashMap::new(), - known_ips: Vec::new(), + known_ips: HashSet::new(), behaviour_penalty: 0f64, + application_score: 0f64, } } } @@ -183,20 +189,6 @@ impl PeerScore { } } - /// Creates a new `PeerScore` with a non-default message id function. - pub fn new_with_msg_id( - params: PeerScoreParams, - msg_id: fn(&GossipsubMessage) -> MessageId, - ) -> Self { - PeerScore { - params, - peer_stats: HashMap::new(), - peer_ips: HashMap::new(), - deliveries: LruCache::with_expiry_duration(Duration::from_secs(TIME_CACHE_DURATION)), - msg_id, - } - } - /// Returns the score for a peer. pub fn score(&self, peer_id: &PeerId) -> f64 { let peer_stats = match self.peer_stats.get(peer_id) { @@ -232,7 +224,14 @@ impl PeerScore { } // P2: first message deliveries - let p2 = topic_stats.first_message_deliveries as f64; + let p2 = { + let v = topic_stats.first_message_deliveries as f64; + if v < topic_params.first_message_deliveries_cap { + v + } else { + topic_params.first_message_deliveries_cap + } + }; topic_score += p2 * topic_params.first_message_deliveries_weight; dbg!(topic_score); @@ -275,11 +274,8 @@ impl PeerScore { dbg!(score); // P5: application-specific score - //TODO: Add in - /* - let p5 = self.params.app_specific_score(peer_id); + let p5 = peer_stats.application_score; score += p5 * self.params.app_specific_weight; - */ // P6: IP collocation factor for ip in peer_stats.known_ips.iter() { @@ -312,6 +308,18 @@ impl PeerScore { } } + fn remove_ips_for_peer( + peer_stats: &PeerStats, + peer_ips: &mut HashMap>, + peer_id: &PeerId, + ) { + for ip in peer_stats.known_ips.iter() { + if let Some(peer_set) = peer_ips.get_mut(ip) { + peer_set.remove(peer_id); + } + } + } + pub fn refresh_scores(&mut self) { let now = Instant::now(); let params_ref = &self.params; @@ -321,11 +329,7 @@ impl PeerScore { // has the retention period expired? if now > expire { // yes, throw it away (but clean up the IP tracking first) - for ip in peer_stats.known_ips.iter() { - if let Some(peer_set) = peer_ips_ref.get_mut(ip) { - peer_set.remove(peer_id); - } - } + Self::remove_ips_for_peer(peer_stats, peer_ips_ref, peer_id); // re address this, use retain or entry return false; } @@ -383,26 +387,38 @@ impl PeerScore { }); } - /// Gets a mutable reference to the underlying IPs for a peer, if they exist. - pub fn get_ips_mut(&mut self, peer_id: &PeerId) -> Option<&mut [IpAddr]> { - let peer_stats = self.peer_stats.get_mut(peer_id)?; - Some(&mut peer_stats.known_ips) + /// Adds a connected peer to `PeerScore`, initialising with empty ips (ips get added later + /// through add_ip. + pub fn add_peer(&mut self, peer_id: PeerId) { + let peer_stats = self.peer_stats.entry(peer_id.clone()).or_default(); + + // mark the peer as connected + peer_stats.status = ConnectionStatus::Connected; } - /// Adds a connected peer to `PeerScore`, initialising with default stats. - pub fn add_peer(&mut self, peer_id: PeerId, known_ips: Vec) { + /// Adds a new ip to a peer, if the peer is not yet known creates a new peer_stats entry for it + pub fn add_ip(&mut self, peer_id: PeerId, ip: IpAddr) { let peer_stats = self.peer_stats.entry(peer_id.clone()).or_default(); - // mark the peer as connected + //mark the peer as connected (currently the default is connected, but we don't want to + // rely on the default). peer_stats.status = ConnectionStatus::Connected; - peer_stats.known_ips = known_ips.clone(); - - // add known ips to the peer score tracking map - for ip in known_ips { - self.peer_ips - .entry(ip) - .or_insert_with(|| HashSet::new()) - .insert(peer_id.clone()); + + //insert the ip + peer_stats.known_ips.insert(ip.clone()); + self.peer_ips + .entry(ip) + .or_insert_with(|| HashSet::new()) + .insert(peer_id); + } + + /// Removes an ip from a peer + pub fn remove_ip(&mut self, peer_id: &PeerId, ip: &IpAddr) { + if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) { + peer_stats.known_ips.remove(ip); + if let Some(peer_ids) = self.peer_ips.get_mut(ip) { + peer_ids.remove(peer_id); + } } } @@ -411,7 +427,10 @@ impl PeerScore { pub fn remove_peer(&mut self, peer_id: &PeerId) { // we only retain non-positive scores of peers if self.score(peer_id) > 0f64 { - self.peer_stats.remove(peer_id); + if let hash_map::Entry::Occupied(entry) = self.peer_stats.entry(peer_id.clone()) { + Self::remove_ips_for_peer(entry.get(), &mut self.peer_ips, peer_id); + entry.remove(); + } return; } @@ -445,12 +464,6 @@ impl PeerScore { } } - /// Handles peer scoring functionality on subscription. - pub fn join(&mut self, _topic: Topic) {} - - /// Handles peer scoring functionality when un-subscribing from a topic. - pub fn leave(&mut self, _topic: Topic) {} - /// Handles scoring functionality as a peer GRAFTs to a topic. pub fn graft(&mut self, peer_id: &PeerId, topic: impl Into) { let topic = topic.into(); @@ -487,9 +500,11 @@ impl PeerScore { } } - //TODO: Required? pub fn validate_message(&mut self, _from: &PeerId, _msg: &GossipsubMessage) { // adds an empty record with the message id + self.deliveries + .entry((self.msg_id)(_msg)) + .or_insert_with(|| DeliveryRecord::default()); } pub fn deliver_message(&mut self, from: &PeerId, msg: &GossipsubMessage) { @@ -600,11 +615,11 @@ impl PeerScore { DeliveryStatus::Unknown => { // the message is being validated; track the peer delivery and wait for // the Deliver/Reject notification. - record.peers.remove(from); + record.peers.insert(from.clone()); } DeliveryStatus::Valid => { // mark the peer delivery time to only count a duplicate delivery once. - record.peers.remove(from); + record.peers.insert(from.clone()); let validated = record.validated.clone(); self.mark_duplicate_message_delivery(from, msg, Some(validated)); } @@ -618,6 +633,17 @@ impl PeerScore { } } + /// Sets the application specific score for a peer. Returns true if the peer is the peer is + /// connected or if the score of the peer is not yet expired and false otherwise. + pub fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool { + if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) { + peer_stats.application_score = new_score; + true + } else { + false + } + } + /// Increments the "invalid message deliveries" counter for all scored topics the message /// is published in. fn mark_invalid_message_delivery(&mut self, peer_id: &PeerId, msg: &GossipsubMessage) { @@ -666,7 +692,7 @@ impl PeerScore { if topic_stats.mesh_message_deliveries + 1f64 > cap { cap } else { - topic_stats.first_message_deliveries + 1f64 + topic_stats.mesh_message_deliveries + 1f64 }; } } @@ -683,6 +709,11 @@ impl PeerScore { validated_time: Option, ) { if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) { + let now = if validated_time.is_some() { + Some(Instant::now()) + } else { + None + }; for topic_hash in msg.topics.iter() { if let Some(topic_stats) = peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params) @@ -698,39 +729,33 @@ impl PeerScore { // the message was received before we finished validation and thus falls within the mesh // delivery window. if let Some(validated_time) = validated_time { - let now = Instant::now(); - let window_time = validated_time - .checked_add(topic_params.mesh_message_deliveries_window) - .unwrap_or_else(|| now.clone()); - if now > window_time { - continue; + if let Some(now) = &now { + //should always be true + let window_time = validated_time + .checked_add(topic_params.mesh_message_deliveries_window) + .unwrap_or_else(|| now.clone()); + if now > &window_time { + continue; + } } - - let cap = topic_params.mesh_message_deliveries_cap; - topic_stats.mesh_message_deliveries = - if topic_stats.mesh_message_deliveries + 1f64 > cap { - cap - } else { - topic_stats.mesh_message_deliveries + 1f64 - }; } + + let cap = topic_params.mesh_message_deliveries_cap; + topic_stats.mesh_message_deliveries = + if topic_stats.mesh_message_deliveries + 1f64 > cap { + cap + } else { + topic_stats.mesh_message_deliveries + 1f64 + }; } } } } } - - /// Removes an IP list from the tracking list for a peer. - fn remove_ips(&mut self, peer_id: &PeerId, ips: Vec) { - for ip in ips { - if let Some(peer_set) = self.peer_ips.get_mut(&ip) { - peer_set.remove(peer_id); - } - } - } } -enum RejectMsg { +//TODO do we need all variants? If yes some of them are never used yet (missing something). +pub(crate) enum RejectMsg { MissingSignature, InvalidSignature, SelfOrigin, @@ -739,4 +764,5 @@ enum RejectMsg { ValidationQueueFull, ValidationThrottled, ValidationIgnored, + ValidationFailed, } diff --git a/protocols/gossipsub/src/peer_score/params.rs b/protocols/gossipsub/src/peer_score/params.rs index 49a507024ea..856185da946 100644 --- a/protocols/gossipsub/src/peer_score/params.rs +++ b/protocols/gossipsub/src/peer_score/params.rs @@ -6,7 +6,7 @@ use std::time::Duration; /// The default number of seconds for a decay interval. const DEFAULT_DECAY_INTERVAL: u64 = 1; /// The default rate to decay to 0. -const DEFAULT_DECAY_TO_ZERO: f64 = 0.01; +const DEFAULT_DECAY_TO_ZERO: f64 = 0.1; // TODO: Adjust these defaults impl Default for TopicScoreParams { @@ -32,7 +32,7 @@ impl Default for TopicScoreParams { mesh_failure_penalty_weight: -1.0, mesh_failure_penalty_decay: 0.5, // P4 - invalid_message_deliveries_weight: 1.0, + invalid_message_deliveries_weight: -1.0, invalid_message_deliveries_decay: 0.3, } } @@ -61,8 +61,8 @@ impl Default for PeerScoreParams { ip_colocation_factor_whitelist: HashSet::new(), behaviour_penalty_weight: -10.0, behaviour_penalty_decay: 0.2, - decay_interval: Duration::from_secs(1), - decay_to_zero: 0.1, + decay_interval: Duration::from_secs(DEFAULT_DECAY_INTERVAL), + decay_to_zero: DEFAULT_DECAY_TO_ZERO, retain_score: Duration::from_secs(3600), } } @@ -70,7 +70,7 @@ impl Default for PeerScoreParams { /// Computes the decay factor for a parameter, assuming the `decay_interval` is 1s /// and that the value decays to zero if it drops below 0.01. -fn score_parameter_decay(decay: Duration) -> f64 { +pub fn score_parameter_decay(decay: Duration) -> f64 { return score_parameter_decay_with_base( decay, Duration::from_secs(DEFAULT_DECAY_INTERVAL), @@ -79,7 +79,7 @@ fn score_parameter_decay(decay: Duration) -> f64 { } /// Computes the decay factor for a parameter using base as the `decay_interval`. -fn score_parameter_decay_with_base(decay: Duration, base: Duration, decay_to_zero: f64) -> f64 { +pub fn score_parameter_decay_with_base(decay: Duration, base: Duration, decay_to_zero: f64) -> f64 { // the decay is linear, so after n ticks the value is factor^n // so factor^n = decay_to_zero => factor = decay_to_zero^(1/n) let ticks = decay.as_secs_f64() / base.as_secs_f64(); @@ -131,7 +131,7 @@ impl PeerScoreThresholds { } #[derive(Debug, Clone)] -pub(crate) struct PeerScoreParams { +pub struct PeerScoreParams { /// Score parameters per topic. pub topics: HashMap, @@ -140,8 +140,6 @@ pub(crate) struct PeerScoreParams { pub topic_score_cap: f64, /// P5: Application-specific peer scoring - //TODO: Add in - // pub app_specific_score func(p peer.ID) f64 pub app_specific_weight: f64, /// P6: IP-colocation factor. @@ -230,7 +228,7 @@ impl PeerScoreParams { } #[derive(Debug, Clone)] -pub(crate) struct TopicScoreParams { +pub struct TopicScoreParams { /// The weight of the topic. pub topic_weight: f64, @@ -301,9 +299,6 @@ impl TopicScoreParams { if self.time_in_mesh_weight < 0f64 { return Err("Invalid time_in_mesh_weight; must be positive (or 0 to disable)"); } - if self.time_in_mesh_weight != 0f64 { - return Err("Invalid time_in_mesh_quantum; must be positive"); - } if self.time_in_mesh_weight != 0f64 && self.time_in_mesh_cap <= 0f64 { return Err("Invalid time_in_mesh_cap must be positive"); } diff --git a/protocols/gossipsub/src/peer_score/tests.rs b/protocols/gossipsub/src/peer_score/tests.rs index 6fd6a3fc57d..e5750b58d3e 100644 --- a/protocols/gossipsub/src/peer_score/tests.rs +++ b/protocols/gossipsub/src/peer_score/tests.rs @@ -60,7 +60,7 @@ fn test_score_time_in_mesh() { let mut peer_score = PeerScore::new(params, default_message_id()); // Peer score should start at 0 - peer_score.add_peer(peer_id.clone(), Vec::new()); + peer_score.add_peer(peer_id.clone()); let score = peer_score.score(&peer_id); assert!( @@ -106,7 +106,7 @@ fn test_score_time_in_mesh_cap() { let mut peer_score = PeerScore::new(params, default_message_id()); // Peer score should start at 0 - peer_score.add_peer(peer_id.clone(), Vec::new()); + peer_score.add_peer(peer_id.clone()); let score = peer_score.score(&peer_id); assert!( @@ -155,7 +155,7 @@ fn test_score_first_message_deliveries() { let mut peer_score = PeerScore::new(params, default_message_id()); // Peer score should start at 0 - peer_score.add_peer(peer_id.clone(), Vec::new()); + peer_score.add_peer(peer_id.clone()); peer_score.graft(&peer_id, topic); // deliver a bunch of messages from the peer @@ -199,7 +199,7 @@ fn test_score_first_message_deliveries_cap() { let mut peer_score = PeerScore::new(params, default_message_id()); // Peer score should start at 0 - peer_score.add_peer(peer_id.clone(), Vec::new()); + peer_score.add_peer(peer_id.clone()); peer_score.graft(&peer_id, topic); // deliver a bunch of messages from the peer @@ -240,7 +240,7 @@ fn test_score_first_message_deliveries_decay() { params.topics.insert(topic_hash, topic_params.clone()); let peer_id = PeerId::random(); let mut peer_score = PeerScore::new(params, default_message_id()); - peer_score.add_peer(peer_id.clone(), Vec::new()); + peer_score.add_peer(peer_id.clone()); peer_score.graft(&peer_id, topic); // deliver a bunch of messages from the peer @@ -313,7 +313,7 @@ fn test_score_mesh_message_deliveries() { let peers = vec![peer_id_a.clone(), peer_id_b.clone(), peer_id_c.clone()]; for peer_id in &peers { - peer_score.add_peer(peer_id.clone(), Vec::new()); + peer_score.add_peer(peer_id.clone()); peer_score.graft(&peer_id, topic.clone()); }