diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 726e7f6d987..b1fb8b16191 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -278,6 +278,9 @@ pub struct Gossipsub { /// Counts the number of `IWANT` that we sent the each peer since the last heartbeat. count_iasked: HashMap, + + /// short term cache for published messsage ids + published_message_ids: DuplicateCache, } impl Gossipsub { @@ -321,13 +324,14 @@ impl Gossipsub { config.heartbeat_interval(), ), heartbeat_ticks: 0, - config, px_peers: HashSet::new(), outbound_peers: HashSet::new(), peer_score: None, count_peer_have: HashMap::new(), count_iasked: HashMap::new(), peer_protocols: HashMap::new(), + published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()), + config, }) } @@ -507,6 +511,12 @@ impl Gossipsub { debug!("Publishing message: {:?}", msg_id); + // If the message is anonymous or has a random author add it to the published message ids + // cache. + if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config { + self.published_message_ids.insert(msg_id.clone()); + } + // If we are not flood publishing forward the message to mesh peers. let mesh_peers_sent = !self.config.flood_publish() && self.forward_msg(message.clone(), None)?; @@ -713,8 +723,11 @@ impl Gossipsub { } let interval = Interval::new(params.decay_interval); - let peer_score = PeerScore::new_with_message_delivery_time_callback(params, self.config - .message_id_fn(), callback); + let peer_score = PeerScore::new_with_message_delivery_time_callback( + params, + self.config.message_id_fn(), + callback, + ); self.peer_score = Some((peer_score, threshold, interval, GossipPromises::default())); Ok(()) } @@ -1025,8 +1038,7 @@ impl Gossipsub { } debug!( "IHAVE: Asking for the following messages from {}: {:?}", - peer_id, - message_ids + peer_id, message_ids ); Self::control_pool_add( @@ -1340,8 +1352,7 @@ impl Gossipsub { if self.blacklisted_peers.contains(source) { debug!( "Rejecting message from peer {} because of blacklisted source: {}", - propagation_source, - source + propagation_source, source ); if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { peer_score.reject_message( @@ -1363,21 +1374,24 @@ impl Gossipsub { } // reject messages claiming to be from ourselves but not locally published - if let Some(own_id) = self.publish_config.get_own_id() { - if !self.config.allow_self_origin() + let self_published = if let Some(own_id) = self.publish_config.get_own_id() { + !self.config.allow_self_origin() && own_id != propagation_source && msg.source.as_ref().map_or(false, |s| s == own_id) - { - debug!( - "Dropping message {} claiming to be from self but forwarded from {}", - msg_id, propagation_source - ); - if let Some((peer_score, _, _, gossip_promises)) = &mut self.peer_score { - peer_score.reject_message(propagation_source, &msg, RejectReason::SelfOrigin); - gossip_promises.reject_message(&msg_id, &RejectReason::SelfOrigin); - } - return; + } else { + self.published_message_ids.contains(&msg_id) + }; + + if self_published { + debug!( + "Dropping message {} claiming to be from self but forwarded from {}", + msg_id, propagation_source + ); + if let Some((peer_score, _, _, gossip_promises)) = &mut self.peer_score { + peer_score.reject_message(propagation_source, &msg, RejectReason::SelfOrigin); + gossip_promises.reject_message(&msg_id, &RejectReason::SelfOrigin); } + return; } // Add the message to the duplication cache and memcache. @@ -1388,7 +1402,10 @@ impl Gossipsub { } return; } - debug!("Put message {:?} in duplication_cache and resolve promises", &msg_id); + debug!( + "Put message {:?} in duplication_cache and resolve promises", + &msg_id + ); // Tells score that message arrived (but is maybe not fully validated yet) // Consider message as delivered for gossip promises @@ -1911,14 +1928,28 @@ impl Gossipsub { scores }); trace!("Mesh message deliveries: {:?}", { - self.mesh.iter().map(|(t, peers)| { - (t.clone(), peers.iter().map(|p| { - (p.clone(), - peer_score.as_ref().expect("peer_score.is_some()").0 - .mesh_message_deliveries(p, t) - .unwrap_or(0.0)) - }).collect::>()) - }).collect::>>() + self.mesh + .iter() + .map(|(t, peers)| { + ( + t.clone(), + peers + .iter() + .map(|p| { + ( + p.clone(), + peer_score + .as_ref() + .expect("peer_score.is_some()") + .0 + .mesh_message_deliveries(p, t) + .unwrap_or(0.0), + ) + }) + .collect::>(), + ) + }) + .collect::>>() }) } @@ -1936,7 +1967,6 @@ impl Gossipsub { self.mcache.shift(); debug!("Completed Heartbeat"); - } /// Emits gossip - Send IHAVE messages to a random set of gossip peers. This is applied to mesh @@ -2649,6 +2679,12 @@ impl NetworkBehaviour for Gossipsub { if let Some((peer_score, ..)) = &mut self.peer_score { if let Some(ip) = get_ip_addr(get_remote_addr(endpoint)) { peer_score.add_ip(&peer_id, ip); + } else { + trace!( + "Couldn't extract ip from endpoint of peer {} with endpoint {:?}", + peer_id, + endpoint + ) } } } @@ -2663,6 +2699,12 @@ impl NetworkBehaviour for Gossipsub { if let Some((peer_score, ..)) = &mut self.peer_score { if let Some(ip) = get_ip_addr(get_remote_addr(endpoint)) { peer_score.remove_ip(peer, &ip); + } else { + trace!( + "Couldn't extract ip from endpoint of peer {} with endpoint {:?}", + peer, + endpoint + ) } } } @@ -2678,9 +2720,21 @@ impl NetworkBehaviour for Gossipsub { if let Some((peer_score, ..)) = &mut self.peer_score { if let Some(ip) = get_ip_addr(get_remote_addr(endpoint_old)) { peer_score.remove_ip(peer, &ip); + } else { + trace!( + "Couldn't extract ip from endpoint of peer {} with endpoint {:?}", + peer, + endpoint_old + ) } if let Some(ip) = get_ip_addr(get_remote_addr(endpoint_new)) { peer_score.add_ip(&peer, ip); + } else { + trace!( + "Couldn't extract ip from endpoint of peer {} with endpoint {:?}", + peer, + endpoint_new + ) } } } @@ -2727,7 +2781,7 @@ impl NetworkBehaviour for Gossipsub { // Check if peer is graylisted in which case we ignore the event if let (true, _) = - self.score_below_threshold(&propagation_source, |pst| pst.graylist_threshold) + self.score_below_threshold(&propagation_source, |pst| pst.graylist_threshold) { debug!("RPC Dropped from greylisted peer {}", propagation_source); return; diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 56735ceb7ea..02a042b5bed 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -2737,9 +2737,9 @@ mod tests { assert!(match &gs.events[0] { NetworkBehaviourAction::GenerateEvent(event) => match event { GossipsubEvent::Subscribed { .. } => true, - _ => false - } - _ => false + _ => false, + }, + _ => false, }); let control_action = GossipsubControlAction::IHave { diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index e13bdd0dfa3..a698c47a61e 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -217,6 +217,9 @@ pub struct GossipsubConfig { /// Enable support for flooodsub peers. Default false. support_floodsub: bool, + + /// Published message ids time cache duration. The default is 10 seconds. + published_message_ids_cache_time: Duration, } impl GossipsubConfig { @@ -455,6 +458,11 @@ impl GossipsubConfig { pub fn support_floodsub(&self) -> bool { self.support_floodsub } + + /// Published message ids time cache duration. The default is 10 seconds. + pub fn published_message_ids_cache_time(&self) -> Duration { + self.published_message_ids_cache_time + } } impl Default for GossipsubConfig { @@ -536,6 +544,7 @@ impl GossipsubConfigBuilder { max_ihave_messages: 10, iwant_followup_time: Duration::from_secs(3), support_floodsub: false, + published_message_ids_cache_time: Duration::from_secs(10), }, } } @@ -783,6 +792,14 @@ impl GossipsubConfigBuilder { self } + pub fn published_message_ids_cache_time( + &mut self, + published_message_ids_cache_time: Duration, + ) -> &mut Self { + self.config.published_message_ids_cache_time = published_message_ids_cache_time; + self + } + /// Constructs a `GossipsubConfig` from the given configuration and validates the settings. pub fn build(&self) -> Result { // check all constraints on config @@ -848,6 +865,10 @@ impl std::fmt::Debug for GossipsubConfig { let _ = builder.field("max_ihave_messages", &self.max_ihave_messages); let _ = builder.field("iwant_followup_time", &self.iwant_followup_time); let _ = builder.field("support_floodsub", &self.support_floodsub); + let _ = builder.field( + "published_message_ids_cache_time", + &self.published_message_ids_cache_time, + ); builder.finish() } } diff --git a/protocols/gossipsub/src/peer_score/mod.rs b/protocols/gossipsub/src/peer_score/mod.rs index 7800f18dee0..feffba13d53 100644 --- a/protocols/gossipsub/src/peer_score/mod.rs +++ b/protocols/gossipsub/src/peer_score/mod.rs @@ -24,7 +24,7 @@ use crate::time_cache::TimeCache; use crate::{GossipsubMessage, MessageId, TopicHash}; use libp2p_core::PeerId; -use log::{debug, warn}; +use log::{debug, trace, warn}; use std::collections::{hash_map, HashMap, HashSet}; use std::net::IpAddr; use std::time::{Duration, Instant}; @@ -203,10 +203,11 @@ impl PeerScore { Self::new_with_message_delivery_time_callback(params, msg_id, None) } - pub fn new_with_message_delivery_time_callback(params: PeerScoreParams, - msg_id: fn(&GossipsubMessage) -> MessageId, - callback: Option) - -> Self { + pub fn new_with_message_delivery_time_callback( + params: PeerScoreParams, + msg_id: fn(&GossipsubMessage) -> MessageId, + callback: Option, + ) -> Self { PeerScore { params, peer_stats: HashMap::new(), @@ -438,6 +439,7 @@ impl PeerScore { /// Adds a new ip to a peer, if the peer is not yet known creates a new peer_stats entry for it pub fn add_ip(&mut self, peer_id: &PeerId, ip: IpAddr) { + trace!("Add ip for peer {}, ip: {}", peer_id, ip); let peer_stats = self.peer_stats.entry(peer_id.clone()).or_default(); // Mark the peer as connected (currently the default is connected, but we don't want to @@ -457,8 +459,21 @@ impl PeerScore { if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) { peer_stats.known_ips.remove(ip); if let Some(peer_ids) = self.peer_ips.get_mut(ip) { + trace!("Remove ip for peer {}, ip: {}", peer_id, ip); peer_ids.remove(peer_id); + } else { + trace!( + "No entry in peer_ips for ip {} which should get removed for peer {}", + ip, + peer_id + ); } + } else { + trace!( + "No peer_stats for peer {} which should remove the ip {}", + peer_id, + ip + ); } } @@ -555,7 +570,8 @@ impl PeerScore { .get(_from) .and_then(|s| s.topics.get(topic)) .map(|ts| ts.in_mesh()) - .unwrap_or(false) { + .unwrap_or(false) + { callback(_from, topic, 0.0); } } @@ -591,6 +607,10 @@ impl PeerScore { match reason { // these messages are not tracked, but the peer is penalized as they are invalid RejectReason::ValidationError(_) | RejectReason::SelfOrigin => { + debug!( + "Message from {} rejected because of ValidationError or SelfOrigin", + from + ); self.mark_invalid_message_delivery(from, msg); return; } @@ -656,7 +676,8 @@ impl PeerScore { .get(from) .and_then(|s| s.topics.get(topic)) .map(|ts| ts.in_mesh()) - .unwrap_or(false) { + .unwrap_or(false) + { callback(from, topic, time); } } @@ -722,7 +743,7 @@ impl PeerScore { } } } - }, + } Vacant(entry) => { entry.insert(params); } @@ -844,8 +865,7 @@ impl PeerScore { } pub(crate) fn mesh_message_deliveries(&self, peer: &PeerId, topic: &TopicHash) -> Option { - self - .peer_stats + self.peer_stats .get(peer) .and_then(|s| s.topics.get(topic)) .map(|t| t.mesh_message_deliveries)