Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gossipsub v1.1 bug fixes #71

Merged
merged 4 commits into from
Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 70 additions & 33 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,11 +783,13 @@ impl<T: Clone + Into<Vec<u8>> + From<Vec<u8>> + AsRef<[u8]>> GenericGossipsub<T>
topic_hash
);

// remove explicit peers and peers with negative scores
// remove explicit peers, peers with negative scores, and backoffed peers
peers = peers
.into_iter()
.filter(|p| {
!self.explicit_peers.contains(p) && !self.score_below_threshold(p, |_| 0.0).0
!self.explicit_peers.contains(p)
&& !self.score_below_threshold(p, |_| 0.0).0
&& !self.backoffs.is_backoff_with_slack(topic_hash, p)
})
.collect();

Expand Down Expand Up @@ -819,6 +821,7 @@ impl<T: Clone + Into<Vec<u8>> + From<Vec<u8>> + AsRef<[u8]>> GenericGossipsub<T>
!added_peers.contains(peer)
&& !self.explicit_peers.contains(peer)
&& !self.score_below_threshold(peer, |_| 0.0).0
&& !self.backoffs.is_backoff_with_slack(topic_hash, peer)
},
);
added_peers.extend(new_peers.clone());
Expand Down Expand Up @@ -942,7 +945,15 @@ impl<T: Clone + Into<Vec<u8>> + From<Vec<u8>> + AsRef<[u8]>> GenericGossipsub<T>
peer_id: &PeerId,
threshold: impl Fn(&PeerScoreThresholds) -> f64,
) -> (bool, f64) {
if let Some((peer_score, thresholds, ..)) = &self.peer_score {
Self::score_below_threshold_from_scores(&self.peer_score, peer_id, threshold)
}

fn score_below_threshold_from_scores(
peer_score: &Option<(PeerScore, PeerScoreThresholds, Interval, GossipPromises)>,
peer_id: &PeerId,
threshold: impl Fn(&PeerScoreThresholds) -> f64,
) -> (bool, f64) {
if let Some((peer_score, thresholds, ..)) = peer_score {
let score = peer_score.score(peer_id);
if score < threshold(thresholds) {
return (true, score);
Expand Down Expand Up @@ -1242,6 +1253,41 @@ impl<T: Clone + Into<Vec<u8>> + From<Vec<u8>> + AsRef<[u8]>> GenericGossipsub<T>
debug!("Completed GRAFT handling for peer: {}", peer_id);
}

fn remove_peer_from_mesh(
&mut self,
peer_id: &PeerId,
topic_hash: &TopicHash,
backoff: Option<u64>,
always_update_backoff: bool,
) {
let mut update_backoff = always_update_backoff;
if let Some(peers) = self.mesh.get_mut(&topic_hash) {
// remove the peer if it exists in the mesh
if peers.remove(peer_id) {
info!(
"PRUNE: Removing peer: {} from the mesh for topic: {}",
peer_id.to_string(),
topic_hash
);

if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.prune(peer_id, topic_hash.clone());
}

update_backoff = true;
}
}
if update_backoff {
let time = if let Some(backoff) = backoff {
Duration::from_secs(backoff)
} else {
self.config.prune_backoff()
};
// is there a backoff specified by the peer? if so obey it.
self.backoffs.update_backoff(&topic_hash, peer_id, time);
}
}

/// Handles PRUNE control messages. Removes peer from the mesh.
fn handle_prune(
&mut self,
Expand All @@ -1252,31 +1298,9 @@ impl<T: Clone + Into<Vec<u8>> + From<Vec<u8>> + AsRef<[u8]>> GenericGossipsub<T>
let (below_threshold, score) =
self.score_below_threshold(peer_id, |pst| pst.accept_px_threshold);
for (topic_hash, px, backoff) in prune_data {
if let Some(peers) = self.mesh.get_mut(&topic_hash) {
// remove the peer if it exists in the mesh
if peers.remove(peer_id) {
info!(
"PRUNE: Removing peer: {} from the mesh for topic: {}",
peer_id.to_string(),
topic_hash
);
}

if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.prune(peer_id, topic_hash.clone());
}

// is there a backoff specified by the peer? if so obey it.
self.backoffs.update_backoff(
&topic_hash,
peer_id,
if let Some(backoff) = backoff {
Duration::from_secs(backoff)
} else {
self.config.prune_backoff()
},
);
self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true);

if self.mesh.contains_key(&topic_hash) {
//connect to px peers
if !px.is_empty() {
// we ignore PX from peers with insufficient score
Expand Down Expand Up @@ -1503,6 +1527,9 @@ impl<T: Clone + Into<Vec<u8>> + From<Vec<u8>> + AsRef<[u8]>> GenericGossipsub<T>
subscriptions,
propagation_source.to_string()
);

let mut unsubscribed_peers = Vec::new();

let subscribed_topics = match self.peer_topics.get_mut(propagation_source) {
Some(topics) => topics,
None => {
Expand Down Expand Up @@ -1547,6 +1574,15 @@ impl<T: Clone + Into<Vec<u8>> + From<Vec<u8>> + AsRef<[u8]>> GenericGossipsub<T>
Some(PeerKind::Gossipsub) => true,
_ => false,
}
&& !Self::score_below_threshold_from_scores(
&self.peer_score,
propagation_source,
|_| 0.0,
)
.0
&& !self
.backoffs
.is_backoff_with_slack(&subscription.topic_hash, propagation_source)
{
if let Some(peers) = self.mesh.get_mut(&subscription.topic_hash) {
if peers.len() < self.config.mesh_n_low() {
Expand Down Expand Up @@ -1593,12 +1629,8 @@ impl<T: Clone + Into<Vec<u8>> + From<Vec<u8>> + AsRef<[u8]>> GenericGossipsub<T>
}
// remove topic from the peer_topics mapping
subscribed_topics.remove(&subscription.topic_hash);
// remove the peer from the mesh if it exists
if let Some(peers) = self.mesh.get_mut(&subscription.topic_hash) {
peers.remove(propagation_source);
// the peer requested the unsubscription so we don't need to send a PRUNE.
}

unsubscribed_peers
.push((propagation_source.clone(), subscription.topic_hash.clone()));
// generate an unsubscribe event to be polled
application_event.push(NetworkBehaviourAction::GenerateEvent(
GenericGossipsubEvent::Unsubscribed {
Expand All @@ -1610,6 +1642,11 @@ impl<T: Clone + Into<Vec<u8>> + From<Vec<u8>> + AsRef<[u8]>> GenericGossipsub<T>
}
}

// remove unsubscribed peers from the mesh if it exists
for (peer_id, topic_hash) in unsubscribed_peers {
self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false);
}

// If we need to send grafts to peer, do so immediately, rather than waiting for the
// heartbeat.
if !grafts.is_empty() {
Expand Down
68 changes: 68 additions & 0 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4902,4 +4902,72 @@ mod tests {
assert_eq!(counters.slow_counter, 1);
assert_eq!(counters.from_counter, 1);
}

#[test]
fn test_subscribe_and_graft_with_negative_score() {
//simulate a communication between two gossipsub instances
let (mut gs1, _, topic_hashes) =
build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring(
0,
vec!["test".into()],
false,
GossipsubConfig::default(),
0,
0,
Some((PeerScoreParams::default(), PeerScoreThresholds::default())),
);
let (mut gs2, _, _) = build_and_inject_nodes(0, vec![], false);

let connection_id = ConnectionId::new(0);

let topic = Topic::new("test");

let p2 = add_peer(&mut gs1, &Vec::new(), true, false);
let p1 = add_peer(&mut gs2, &topic_hashes, false, false);

//add penalty to peer p2
gs1.peer_score.as_mut().unwrap().0.add_penalty(&p2, 1);

let original_score = gs1.peer_score.as_ref().unwrap().0.score(&p2);

//subscribe to topic in gs2
gs2.subscribe(&topic).unwrap();

let forward_messages_to_p1 = |gs1: &mut Gossipsub, gs2: &mut Gossipsub| {
//collect messages to p1
let messages_to_p1 = gs2.events.drain(..).filter_map(|e| match e {
NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => {
if &peer_id == &p1 {
Some(event)
} else {
None
}
}
_ => None,
});
for message in messages_to_p1 {
gs1.inject_event(
p2.clone(),
connection_id,
HandlerEvent::Message {
rpc: proto_to_message(&message),
invalid_messages: vec![],
},
);
}
};

//forward the subscribe message
forward_messages_to_p1(&mut gs1, &mut gs2);

//heartbeats on both
gs1.heartbeat();
gs2.heartbeat();

//forward messages again
forward_messages_to_p1(&mut gs1, &mut gs2);

//nobody got penalized
assert!(gs1.peer_score.as_ref().unwrap().0.score(&p2) >= original_score);
}
}