From b169c666090ef550e5bb51abb2964ef995164ced Mon Sep 17 00:00:00 2001 From: Age Manning Date: Mon, 5 Oct 2020 15:36:21 +1100 Subject: [PATCH 1/3] Allow helper functions - remove warnings --- protocols/gossipsub/src/mcache.rs | 1 + protocols/gossipsub/src/peer_score/mod.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/protocols/gossipsub/src/mcache.rs b/protocols/gossipsub/src/mcache.rs index fa1cc0b8f12..4ec75ccc4af 100644 --- a/protocols/gossipsub/src/mcache.rs +++ b/protocols/gossipsub/src/mcache.rs @@ -88,6 +88,7 @@ impl MessageCache { } /// Get a message with `message_id` + #[allow(dead_code)] pub fn get(&self, message_id: &MessageId) -> Option<&GossipsubMessage> { self.msgs.get(message_id) } diff --git a/protocols/gossipsub/src/peer_score/mod.rs b/protocols/gossipsub/src/peer_score/mod.rs index feffba13d53..ba8a6e26b40 100644 --- a/protocols/gossipsub/src/peer_score/mod.rs +++ b/protocols/gossipsub/src/peer_score/mod.rs @@ -199,6 +199,7 @@ impl Default for DeliveryRecord { impl PeerScore { /// Creates a new `PeerScore` using a given set of peer scoring parameters. + #[allow(dead_code)] pub fn new(params: PeerScoreParams, msg_id: fn(&GossipsubMessage) -> MessageId) -> Self { Self::new_with_message_delivery_time_callback(params, msg_id, None) } From 5c9a137b6710621efbf9277487eb1f891b899ce9 Mon Sep 17 00:00:00 2001 From: blacktemplar Date: Wed, 7 Oct 2020 12:35:39 +0200 Subject: [PATCH 2/3] don't graft to negative scored peers after subscription --- protocols/gossipsub/src/behaviour.rs | 16 ++++- protocols/gossipsub/src/behaviour/tests.rs | 68 ++++++++++++++++++++++ 2 files changed, 83 insertions(+), 1 deletion(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index b1fb8b16191..5de496a7501 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -937,7 +937,15 @@ impl Gossipsub { peer_id: &PeerId, threshold: impl Fn(&PeerScoreThresholds) -> f64, ) -> (bool, f64) { - if let Some((peer_score, thresholds, ..)) = &self.peer_score { + Self::score_below_threshold_from_scores(&self.peer_score, peer_id, threshold) + } + + fn score_below_threshold_from_scores( + peer_score: &Option<(PeerScore, PeerScoreThresholds, Interval, GossipPromises)>, + peer_id: &PeerId, + threshold: impl Fn(&PeerScoreThresholds) -> f64, + ) -> (bool, f64) { + if let Some((peer_score, thresholds, ..)) = peer_score { let score = peer_score.score(peer_id); if score < threshold(thresholds) { return (true, score); @@ -1500,6 +1508,12 @@ impl Gossipsub { Some(PeerKind::Gossipsub) => true, _ => false, } + && !Self::score_below_threshold_from_scores( + &self.peer_score, + propagation_source, + |_| 0.0, + ) + .0 { if let Some(peers) = self.mesh.get_mut(&subscription.topic_hash) { if peers.len() < self.config.mesh_n_low() { diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 02a042b5bed..21e30d7a53a 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -4748,4 +4748,72 @@ mod tests { "Expected all_peers to contain all peers." ); } + + #[test] + fn test_subscribe_and_graft_with_negative_score() { + //simulate a communication between two gossipsub instances + let (mut gs1, _, topic_hashes) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + 0, + vec!["test".into()], + false, + GossipsubConfig::default(), + 0, + 0, + Some((PeerScoreParams::default(), PeerScoreThresholds::default())), + ); + let (mut gs2, _, _) = build_and_inject_nodes(0, vec![], false); + + let connection_id = ConnectionId::new(0); + + let topic = Topic::new("test"); + + let p2 = add_peer(&mut gs1, &Vec::new(), true, false); + let p1 = add_peer(&mut gs2, &topic_hashes, false, false); + + //add penalty to peer p2 + gs1.peer_score.as_mut().unwrap().0.add_penalty(&p2, 1); + + let original_score = gs1.peer_score.as_ref().unwrap().0.score(&p2); + + //subscribe to topic in gs2 + gs2.subscribe(&topic).unwrap(); + + let forward_messages_to_p1 = |gs1: &mut Gossipsub, gs2: &mut Gossipsub| { + //collect messages to p1 + let messages_to_p1 = gs2.events.drain(..).filter_map(|e| match e { + NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { + if &peer_id == &p1 { + Some(event) + } else { + None + } + } + _ => None, + }); + for message in messages_to_p1 { + gs1.inject_event( + p2.clone(), + connection_id, + HandlerEvent::Message { + rpc: proto_to_message(&message), + invalid_messages: vec![], + }, + ); + } + }; + + //forward the subscribe message + forward_messages_to_p1(&mut gs1, &mut gs2); + + //heartbeats on both + gs1.heartbeat(); + gs2.heartbeat(); + + //forward messages again + forward_messages_to_p1(&mut gs1, &mut gs2); + + //nobody got penalized + assert!(gs1.peer_score.as_ref().unwrap().0.score(&p2) >= original_score); + } } From dbe9b06c70e01f7e74f14c053a691b766d6358b2 Mon Sep 17 00:00:00 2001 From: blacktemplar Date: Wed, 7 Oct 2020 13:24:35 +0200 Subject: [PATCH 3/3] improve backoff handling --- protocols/gossipsub/src/behaviour.rs | 87 ++++++++++++++++++---------- 1 file changed, 55 insertions(+), 32 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 5de496a7501..fac113a432d 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -778,11 +778,13 @@ impl Gossipsub { topic_hash ); - // remove explicit peers and peers with negative scores + // remove explicit peers, peers with negative scores, and backoffed peers peers = peers .into_iter() .filter(|p| { - !self.explicit_peers.contains(p) && !self.score_below_threshold(p, |_| 0.0).0 + !self.explicit_peers.contains(p) + && !self.score_below_threshold(p, |_| 0.0).0 + && !self.backoffs.is_backoff_with_slack(topic_hash, p) }) .collect(); @@ -814,6 +816,7 @@ impl Gossipsub { !added_peers.contains(peer) && !self.explicit_peers.contains(peer) && !self.score_below_threshold(peer, |_| 0.0).0 + && !self.backoffs.is_backoff_with_slack(topic_hash, peer) }, ); added_peers.extend(new_peers.clone()); @@ -1242,6 +1245,41 @@ impl Gossipsub { debug!("Completed GRAFT handling for peer: {}", peer_id); } + fn remove_peer_from_mesh( + &mut self, + peer_id: &PeerId, + topic_hash: &TopicHash, + backoff: Option, + always_update_backoff: bool, + ) { + let mut update_backoff = always_update_backoff; + if let Some(peers) = self.mesh.get_mut(&topic_hash) { + // remove the peer if it exists in the mesh + if peers.remove(peer_id) { + info!( + "PRUNE: Removing peer: {} from the mesh for topic: {}", + peer_id.to_string(), + topic_hash + ); + + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.prune(peer_id, topic_hash.clone()); + } + + update_backoff = true; + } + } + if update_backoff { + let time = if let Some(backoff) = backoff { + Duration::from_secs(backoff) + } else { + self.config.prune_backoff() + }; + // is there a backoff specified by the peer? if so obey it. + self.backoffs.update_backoff(&topic_hash, peer_id, time); + } + } + /// Handles PRUNE control messages. Removes peer from the mesh. fn handle_prune( &mut self, @@ -1252,31 +1290,9 @@ impl Gossipsub { let (below_threshold, score) = self.score_below_threshold(peer_id, |pst| pst.accept_px_threshold); for (topic_hash, px, backoff) in prune_data { - if let Some(peers) = self.mesh.get_mut(&topic_hash) { - // remove the peer if it exists in the mesh - if peers.remove(peer_id) { - info!( - "PRUNE: Removing peer: {} from the mesh for topic: {}", - peer_id.to_string(), - topic_hash - ); - } - - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.prune(peer_id, topic_hash.clone()); - } - - // is there a backoff specified by the peer? if so obey it. - self.backoffs.update_backoff( - &topic_hash, - peer_id, - if let Some(backoff) = backoff { - Duration::from_secs(backoff) - } else { - self.config.prune_backoff() - }, - ); + self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true); + if self.mesh.contains_key(&topic_hash) { //connect to px peers if !px.is_empty() { // we ignore PX from peers with insufficient score @@ -1464,6 +1480,9 @@ impl Gossipsub { subscriptions, propagation_source.to_string() ); + + let mut unsubscribed_peers = Vec::new(); + let subscribed_topics = match self.peer_topics.get_mut(propagation_source) { Some(topics) => topics, None => { @@ -1514,6 +1533,9 @@ impl Gossipsub { |_| 0.0, ) .0 + && !self + .backoffs + .is_backoff_with_slack(&subscription.topic_hash, propagation_source) { if let Some(peers) = self.mesh.get_mut(&subscription.topic_hash) { if peers.len() < self.config.mesh_n_low() { @@ -1560,12 +1582,8 @@ impl Gossipsub { } // remove topic from the peer_topics mapping subscribed_topics.remove(&subscription.topic_hash); - // remove the peer from the mesh if it exists - if let Some(peers) = self.mesh.get_mut(&subscription.topic_hash) { - peers.remove(propagation_source); - // the peer requested the unsubscription so we don't need to send a PRUNE. - } - + unsubscribed_peers + .push((propagation_source.clone(), subscription.topic_hash.clone())); // generate an unsubscribe event to be polled application_event.push(NetworkBehaviourAction::GenerateEvent( GossipsubEvent::Unsubscribed { @@ -1577,6 +1595,11 @@ impl Gossipsub { } } + // remove unsubscribed peers from the mesh if it exists + for (peer_id, topic_hash) in unsubscribed_peers { + self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false); + } + // If we need to send grafts to peer, do so immediately, rather than waiting for the // heartbeat. if !grafts.is_empty() {