Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various improvements #60

Merged
merged 13 commits into from
Sep 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 101 additions & 30 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,24 +336,43 @@ impl Gossipsub {
self.mesh.keys()
}

/// Lists peers for a certain topic hash.
pub fn peers(&self, topic_hash: &TopicHash) -> impl Iterator<Item = &PeerId> {
/// Lists all mesh peers for a certain topic hash.
pub fn mesh_peers(&self, topic_hash: &TopicHash) -> impl Iterator<Item = &PeerId> {
self.mesh
.get(topic_hash)
.into_iter()
.map(|x| x.into_iter())
.flatten()
}

/// Lists all peers for any topic.
pub fn all_peers(&self) -> impl Iterator<Item = &PeerId> {
/// Lists all mesh peers for all topics.
pub fn all_mesh_peers(&self) -> impl Iterator<Item = &PeerId> {
let mut res = BTreeSet::new();
for peers in self.mesh.values() {
res.extend(peers);
}
res.into_iter()
}

/// Lists all known peers and their associated subscribed topics.
pub fn all_peers(&self) -> impl Iterator<Item = (&PeerId, Vec<&TopicHash>)> {
self.peer_topics
.iter()
.map(|(peer_id, topic_set)| (peer_id, topic_set.iter().collect()))
}

/// Lists all known peers and their associated protocol.
pub fn peer_protocol(&self) -> impl Iterator<Item = (&PeerId, &PeerKind)> {
self.peer_protocols.iter()
}

/// Returns the gossipsub score for a given peer, if one exists.
pub fn peer_score(&self, peer_id: &PeerId) -> Option<f64> {
self.peer_score
.as_ref()
.map(|(score, ..)| score.score(peer_id))
}

/// Subscribe to a topic.
///
/// Returns true if the subscription worked. Returns false if we were already subscribed.
Expand Down Expand Up @@ -440,7 +459,7 @@ impl Gossipsub {
&mut self,
topic: Topic<H>,
data: impl Into<Vec<u8>>,
) -> Result<(), PublishError> {
) -> Result<MessageId, PublishError> {
self.publish_many(iter::once(topic), data)
}

Expand All @@ -449,7 +468,7 @@ impl Gossipsub {
&mut self,
topics: impl IntoIterator<Item = Topic<H>>,
data: impl Into<Vec<u8>>,
) -> Result<(), PublishError> {
) -> Result<MessageId, PublishError> {
let message =
self.build_message(topics.into_iter().map(|t| t.hash()).collect(), data.into())?;
let msg_id = (self.config.message_id_fn())(&message);
Expand Down Expand Up @@ -575,8 +594,8 @@ impl Gossipsub {
self.send_message(peer_id.clone(), event.clone())?;
}

info!("Published message: {:?}", msg_id);
Ok(())
info!("Published message: {:?}", &msg_id);
Ok(msg_id)
}

/// This function should be called when `config.validate_messages()` is `true` after the
Expand Down Expand Up @@ -669,17 +688,33 @@ impl Gossipsub {

/// Activates the peer scoring system with the given parameters. This will reset all scores
/// if there was already another peer scoring system activated. Returns an error if the
/// params are not valid.
/// params are not valid or if they got already set.
pub fn with_peer_score(
&mut self,
params: PeerScoreParams,
threshold: PeerScoreThresholds,
) -> Result<(), String> {
self.with_peer_score_and_message_delivery_time_callback(params, threshold, None)
}

/// Activates the peer scoring system with the given parameters and a message delivery time
/// callback. Returns an error if the parameters got already set.
pub fn with_peer_score_and_message_delivery_time_callback(
&mut self,
params: PeerScoreParams,
threshold: PeerScoreThresholds,
callback: Option<fn(&PeerId, &TopicHash, f64)>,
) -> Result<(), String> {
params.validate()?;
threshold.validate()?;

if self.peer_score.is_some() {
return Err("Peer score set twice".into());
}

let interval = Interval::new(params.decay_interval);
let peer_score = PeerScore::new(params, self.config.message_id_fn());
let peer_score = PeerScore::new_with_message_delivery_time_callback(params, self.config
.message_id_fn(), callback);
self.peer_score = Some((peer_score, threshold, interval, GossipPromises::default()));
Ok(())
}
Expand Down Expand Up @@ -950,7 +985,7 @@ impl Gossipsub {
}

for id in ids {
if self.mcache.get(&id).is_none() {
if !self.duplication_cache.contains(&id) {
// have not seen this message, request it
iwant_ids.insert(id);
}
Expand Down Expand Up @@ -988,6 +1023,11 @@ impl Gossipsub {
Instant::now() + self.config.iwant_followup_time(),
);
}
debug!(
"IHAVE: Asking for the following messages from {}: {:?}",
peer_id,
message_ids
);

Self::control_pool_add(
&mut self.control_pool,
Expand Down Expand Up @@ -1072,6 +1112,10 @@ impl Gossipsub {
if let Some(peers) = self.mesh.get_mut(&topic_hash) {
// if the peer is already in the mesh ignore the graft
if peers.contains(peer_id) {
debug!(
"GRAFT: Received graft for peer {:?} that is already in topic {:?}",
peer_id, &topic_hash
);
continue;
}

Expand Down Expand Up @@ -1141,6 +1185,10 @@ impl Gossipsub {
} else {
// don't do PX when there is an unknown topic to avoid leaking our peers
do_px = false;
debug!(
"GRAFT: Received graft for unknown topic {:?} from peer {:?}",
&topic_hash, peer_id
);
// spam hardening: ignore GRAFTs for unknown topics
continue;
}
Expand Down Expand Up @@ -1290,6 +1338,11 @@ impl Gossipsub {
// Also reject any message that originated from a blacklisted peer
if let Some(source) = msg.source.as_ref() {
if self.blacklisted_peers.contains(source) {
debug!(
"Rejecting message from peer {} because of blacklisted source: {}",
propagation_source,
source
);
if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
peer_score.reject_message(
propagation_source,
Expand Down Expand Up @@ -1335,6 +1388,7 @@ impl Gossipsub {
}
return;
}
debug!("Put message {:?} in duplication_cache and resolve promises", &msg_id);

// Tells score that message arrived (but is maybe not fully validated yet)
// Consider message as delivered for gossip promises
Expand Down Expand Up @@ -1849,6 +1903,25 @@ impl Gossipsub {
}
}

if self.peer_score.is_some() {
trace!("Peer_scores: {:?}", {
for (peer, _) in &self.peer_topics {
score(peer);
}
scores
});
trace!("Mesh message deliveries: {:?}", {
self.mesh.iter().map(|(t, peers)| {
(t.clone(), peers.iter().map(|p| {
(p.clone(),
peer_score.as_ref().expect("peer_score.is_some()").0
.mesh_message_deliveries(p, t)
.unwrap_or(0.0))
}).collect::<HashMap<PeerId, f64>>())
}).collect::<HashMap<TopicHash, HashMap<PeerId, f64>>>()
})
}

self.emit_gossip();

// send graft/prunes
Expand All @@ -1863,9 +1936,7 @@ impl Gossipsub {
self.mcache.shift();

debug!("Completed Heartbeat");
if self.peer_score.is_some() {
trace!("Peer_scores: {:?}", scores);
}

}

/// Emits gossip - Send IHAVE messages to a random set of gossip peers. This is applied to mesh
Expand Down Expand Up @@ -2646,6 +2717,22 @@ impl NetworkBehaviour for Gossipsub {
rpc,
invalid_messages,
} => {
// Handle the gossipsub RPC

// Handle subscriptions
// Update connected peers topics
if !rpc.subscriptions.is_empty() {
self.handle_received_subscriptions(&rpc.subscriptions, &propagation_source);
}

// Check if peer is graylisted in which case we ignore the event
if let (true, _) =
self.score_below_threshold(&propagation_source, |pst| pst.graylist_threshold)
{
debug!("RPC Dropped from greylisted peer {}", propagation_source);
return;
}

// Handle any invalid messages from this peer
if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
let id_fn = self.config.message_id_fn();
Expand All @@ -2666,22 +2753,6 @@ impl NetworkBehaviour for Gossipsub {
}
}

// Handle the Gossipsub RPC

// Check if peer is graylisted in which case we ignore the event
if let (true, _) =
self.score_below_threshold(&propagation_source, |pst| pst.graylist_threshold)
{
debug!("RPC Dropped from greylisted peer {}", propagation_source);
return;
}

// Handle subscriptions
// Update connected peers topics
if !rpc.subscriptions.is_empty() {
self.handle_received_subscriptions(&rpc.subscriptions, &propagation_source);
}

// Handle messages
for message in rpc.messages {
self.handle_received_message(message, &propagation_source);
Expand Down
17 changes: 12 additions & 5 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2732,8 +2732,15 @@ mod tests {
},
);

//no events got processed
assert!(gs.events.is_empty());
//only the subscription event gets processed, the rest is dropped
assert_eq!(gs.events.len(), 1);
assert!(match &gs.events[0] {
NetworkBehaviourAction::GenerateEvent(event) => match event {
GossipsubEvent::Subscribed { .. } => true,
_ => false
}
_ => false
});

let control_action = GossipsubControlAction::IHave {
topic_hash: topics[0].clone(),
Expand All @@ -2755,7 +2762,7 @@ mod tests {
);

//events got processed
assert!(!gs.events.is_empty());
assert!(gs.events.len() > 1);
}

#[test]
Expand Down Expand Up @@ -4728,15 +4735,15 @@ mod tests {
);

assert_eq!(
gs.peers(&TopicHash::from_raw("topic1"))
gs.mesh_peers(&TopicHash::from_raw("topic1"))
.cloned()
.collect::<BTreeSet<_>>(),
peers,
"Expected peers for a registered topic to contain all peers."
);

assert_eq!(
gs.all_peers().cloned().collect::<BTreeSet<_>>(),
gs.all_mesh_peers().cloned().collect::<BTreeSet<_>>(),
peers,
"Expected all_peers to contain all peers."
);
Expand Down
2 changes: 2 additions & 0 deletions protocols/gossipsub/src/mcache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl MessageCache {
/// Returns the message if it already exists.
pub fn put(&mut self, msg: GossipsubMessage) -> Option<GossipsubMessage> {
let message_id = (self.msg_id)(&msg);
debug!("Put message {:?} in mcache", &message_id);
let cache_entry = CacheEntry {
mid: message_id.clone(),
topics: msg.topics.clone(),
Expand Down Expand Up @@ -163,6 +164,7 @@ impl MessageCache {
);
}
}
debug!("Remove message from the cache: {}", &entry.mid);

self.iwant_counts.remove(&entry.mid);
}
Expand Down
Loading