Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gossipsub v1.1 various improvements #49

Merged
merged 11 commits into from
Aug 17, 2020
78 changes: 36 additions & 42 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ use crate::handler::{GossipsubHandler, HandlerEvent};
use crate::mcache::MessageCache;
use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason};
use crate::protocol::SIGNING_PREFIX;
use crate::rpc_proto;
use crate::time_cache::DuplicateCache;
use crate::topic::{Hasher, Topic, TopicHash};
use crate::types::{
GossipsubControlAction, GossipsubMessage, GossipsubSubscription, GossipsubSubscriptionAction,
MessageId, PeerInfo,
};
use crate::types::{GossipsubRpc, PeerKind};
use crate::{rpc_proto, TopicScoreParams};
use std::cmp::Ordering::Equal;

mod tests;
Expand Down Expand Up @@ -344,6 +344,7 @@ impl BackoffStorage {
}
}

#[derive(Debug)]
pub enum MessageAcceptance {
/// The message is considered valid, and it should be delivered and forwarded to the network
Accept,
Expand Down Expand Up @@ -686,9 +687,10 @@ impl Gossipsub {
}

/// This function should be called when `config.validate_messages()` is `true` after the
/// message got validated by the caller. Messages are stored in the ['Memcache'] and validation
/// is expected to be fast enough that the messages should still exist in the cache.There are
/// three possible validation outcomes and the outcome is given in acceptance.
/// message got validated by the caller. Messages are stored in the
/// ['Memcache'] and validation is expected to be fast enough that the messages should still
/// exist in the cache. There are three possible validation outcomes and the outcome is given
/// in acceptance.
///
/// If acceptance = Accept the message will get propagated to the network. The
/// `propagation_source` parameter indicates who the message was received by and will not
Expand All @@ -704,7 +706,7 @@ impl Gossipsub {
/// in the cache anymore.
///
/// This should only be called once per message.
pub fn validate_message(
pub fn report_message_validation_result(
&mut self,
message_id: &MessageId,
propagation_source: &PeerId,
Expand All @@ -730,14 +732,7 @@ impl Gossipsub {
};

if let Some(message) = self.mcache.remove(message_id) {
//tell peer_score and gossip promises about reject
Self::reject_message(
&mut self.peer_score,
propagation_source,
&message,
message_id,
reject_reason,
);
//tell peer_score about reject
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.reject_message(propagation_source, &message, reject_reason);
}
Expand Down Expand Up @@ -782,6 +777,12 @@ impl Gossipsub {
Ok(())
}

pub fn set_topic_params(&mut self, topic_hash: TopicHash, params: TopicScoreParams) {
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.set_topic_params(topic_hash, params);
}
}

/// Sets the application specific score for a peer. Returns true if scoring is active and
/// the peer is connected or if the score of the peer is not yet expired, false otherwise.
pub fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
Expand Down Expand Up @@ -1315,20 +1316,6 @@ impl Gossipsub {
}
}

/// informs peer score and gossip_promises about a rejected message
fn reject_message(
peer_score: &mut Option<(PeerScore, PeerScoreThresholds, Interval, GossipPromises)>,
from: &PeerId,
msg: &GossipsubMessage,
id: &MessageId,
reason: RejectReason,
) {
if let Some((peer_score, _, _, gossip_promises)) = peer_score {
peer_score.reject_message(from, &msg, reason);
gossip_promises.reject_message(id, &reason);
}
}

/// Handles a newly received GossipsubMessage.
/// Forwards the message to all peers in the mesh.
fn handle_received_message(&mut self, mut msg: GossipsubMessage, propagation_source: &PeerId) {
Expand All @@ -1348,18 +1335,21 @@ impl Gossipsub {

// reject messages claiming to be from ourselves but not locally published
if let Some(own_id) = self.publish_config.get_own_id() {
if own_id != propagation_source && msg.source.as_ref().map_or(false, |s| s == own_id) {
//TODO remove this "hack" as soon as lighthouse uses Anonymous instead of this fixed
// PeerId.
let lighthouse_anonymous_id = PeerId::from_bytes(vec![0, 1, 0]).expect("Valid peer id");
if own_id != &lighthouse_anonymous_id
&& own_id != propagation_source
&& msg.source.as_ref().map_or(false, |s| s == own_id)
{
debug!(
"Dropping message claiming to be from self but forwarded from {:?}",
propagation_source
);
Self::reject_message(
&mut self.peer_score,
propagation_source,
&msg,
&msg_id,
RejectReason::SelfOrigin,
"Dropping message {:?} claiming to be from self but forwarded from {:?}",
msg_id, propagation_source
);
if let Some((peer_score, _, _, gossip_promises)) = &mut self.peer_score {
peer_score.reject_message(propagation_source, &msg, RejectReason::SelfOrigin);
gossip_promises.reject_message(&msg_id, &RejectReason::SelfOrigin);
}
return;
}
}
Expand All @@ -1374,8 +1364,10 @@ impl Gossipsub {
}

//tells score that message arrived (but is maybe not fully validated yet)
if let Some((peer_score, ..)) = &mut self.peer_score {
//Consider message as delivered for gossip promises
if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
peer_score.validate_message(propagation_source, &msg);
gossip_promises.deliver_message(&msg_id);
}

self.mcache.put(msg.clone());
Expand Down Expand Up @@ -1885,6 +1877,7 @@ impl Gossipsub {
self.mcache.shift();

debug!("Completed Heartbeat");
debug!("peer_scores: {:?}", scores);
}

/// Emits gossip - Send IHAVE messages to a random set of gossip peers. This is applied to mesh
Expand Down Expand Up @@ -2032,12 +2025,11 @@ impl Gossipsub {
fn forward_msg(&mut self, message: GossipsubMessage, source: Option<&PeerId>) -> bool {
let msg_id = (self.config.message_id_fn())(&message);

//message is fully validated, inform peer_score and gossip promises
//message is fully validated inform peer_score
if let Some((peer_score, _, _, gossip_promises)) = &mut self.peer_score {
if let Some(peer) = source {
peer_score.deliver_message(peer, &message);
}
gossip_promises.deliver_message(&msg_id);
}

debug!("Forwarding message: {:?}", msg_id);
Expand All @@ -2058,7 +2050,7 @@ impl Gossipsub {
//add explicit peers
for p in &self.explicit_peers {
if let Some(topics) = self.peer_topics.get(p) {
if message.topics.iter().any(|t| topics.contains(t)) {
if Some(p) != source && message.topics.iter().any(|t| topics.contains(t)) {
recipient_peers.insert(p.clone());
}
}
Expand Down Expand Up @@ -2475,10 +2467,12 @@ impl NetworkBehaviour for Gossipsub {
invalid_messages,
} => {
// Handle any invalid messages from this peer
if let Some((peer_score, ..)) = &mut self.peer_score {
if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
let mut id_fn = self.config.message_id_fn();
for (_message, validation_error) in invalid_messages {
let reason = RejectReason::ProtocolValidationError(validation_error);
peer_score.reject_message(&propagation_source, &_message, reason);
gossip_promises.reject_message(&id_fn(&_message), &reason);
}
}

Expand Down
16 changes: 8 additions & 8 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3186,7 +3186,7 @@ mod tests {
assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0);

//message m1 gets validated
gs.validate_message(&id(&m1), &peers[0], MessageAcceptance::Accept);
gs.report_message_validation_result(&id(&m1), &peers[0], MessageAcceptance::Accept);

assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0);
}
Expand Down Expand Up @@ -3348,7 +3348,7 @@ mod tests {
assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0);

//message m1 gets ignored
gs.validate_message(
gs.report_message_validation_result(
&(config.message_id_fn())(&m1),
&peers[0],
MessageAcceptance::Ignore,
Expand Down Expand Up @@ -3404,7 +3404,7 @@ mod tests {
assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0);

//message m1 gets rejected
gs.validate_message(
gs.report_message_validation_result(
&(config.message_id_fn())(&m1),
&peers[0],
MessageAcceptance::Reject,
Expand Down Expand Up @@ -3467,7 +3467,7 @@ mod tests {
assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[1]), 0.0);

//message m1 gets rejected
gs.validate_message(
gs.report_message_validation_result(
&(config.message_id_fn())(&m1),
&peers[0],
MessageAcceptance::Reject,
Expand Down Expand Up @@ -3534,17 +3534,17 @@ mod tests {
assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0);

//messages gets rejected
gs.validate_message(
gs.report_message_validation_result(
&(config.message_id_fn())(&m1),
&peers[0],
MessageAcceptance::Reject,
);
gs.validate_message(
gs.report_message_validation_result(
&(config.message_id_fn())(&m2),
&peers[0],
MessageAcceptance::Reject,
);
gs.validate_message(
gs.report_message_validation_result(
&(config.message_id_fn())(&m3),
&peers[0],
MessageAcceptance::Reject,
Expand Down Expand Up @@ -3604,7 +3604,7 @@ mod tests {
assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0);

//message m1 gets rejected
gs.validate_message(
gs.report_message_validation_result(
&(config.message_id_fn())(&m1),
&peers[0],
MessageAcceptance::Reject,
Expand Down
6 changes: 6 additions & 0 deletions protocols/gossipsub/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,12 @@ impl Default for GossipsubConfigBuilder {
}
}

impl From<GossipsubConfig> for GossipsubConfigBuilder {
fn from(config: GossipsubConfig) -> Self {
GossipsubConfigBuilder { config }
}
}

impl GossipsubConfigBuilder {
// set default values
pub fn new() -> GossipsubConfigBuilder {
Expand Down
7 changes: 6 additions & 1 deletion protocols/gossipsub/src/gossip_promises.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::error::ValidationError;
use crate::peer_score::RejectReason;
use crate::MessageId;
use libp2p_core::PeerId;
use log::debug;
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::collections::HashMap;
Expand Down Expand Up @@ -56,11 +57,15 @@ impl GossipPromises {
pub fn get_broken_promises(&mut self) -> HashMap<PeerId, usize> {
let now = Instant::now();
let mut result = HashMap::new();
self.promises.retain(|_, peers| {
self.promises.retain(|msg, peers| {
peers.retain(|peer_id, expires| {
if *expires < now {
let count = result.entry(peer_id.clone()).or_insert(0);
*count += 1;
debug!(
"The peer {} broke the promise to deliver message {} in time!",
peer_id, msg
);
false
} else {
true
Expand Down
2 changes: 1 addition & 1 deletion protocols/gossipsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ mod rpc_proto {
include!(concat!(env!("OUT_DIR"), "/gossipsub.pb.rs"));
}

pub use self::behaviour::{Gossipsub, GossipsubEvent, MessageAuthenticity};
pub use self::behaviour::{Gossipsub, GossipsubEvent, MessageAcceptance, MessageAuthenticity};
pub use self::config::{GossipsubConfig, GossipsubConfigBuilder, ValidationMode};
pub use self::peer_score::{
score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds,
Expand Down
Loading