From 7cd21bf22403382316405032407056073624f0c4 Mon Sep 17 00:00:00 2001 From: blacktemplar Date: Wed, 12 Aug 2020 08:47:52 +0200 Subject: [PATCH 01/11] remove dbg! calls and add debug logging for peer scoring --- protocols/gossipsub/src/behaviour.rs | 1 + protocols/gossipsub/src/peer_score/mod.rs | 32 ++++++++++++++++------- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index a16db36fce2..f3371bffba4 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1885,6 +1885,7 @@ impl Gossipsub { self.mcache.shift(); debug!("Completed Heartbeat"); + debug!("peer_scores: {:?}", scores); } /// Emits gossip - Send IHAVE messages to a random set of gossip peers. This is applied to mesh diff --git a/protocols/gossipsub/src/peer_score/mod.rs b/protocols/gossipsub/src/peer_score/mod.rs index e3af0a63ada..9fa82c3c18f 100644 --- a/protocols/gossipsub/src/peer_score/mod.rs +++ b/protocols/gossipsub/src/peer_score/mod.rs @@ -3,7 +3,7 @@ use crate::time_cache::TimeCache; use crate::{GossipsubMessage, MessageId, TopicHash}; use libp2p_core::PeerId; -use log::warn; +use log::{debug, warn}; use std::collections::{hash_map, HashMap, HashSet}; use std::net::IpAddr; use std::time::{Duration, Instant}; @@ -217,9 +217,7 @@ impl PeerScore { topic_params.time_in_mesh_cap } }; - dbg!(topic_score); topic_score += p1 * topic_params.time_in_mesh_weight; - dbg!(topic_score); } // P2: first message deliveries @@ -232,7 +230,6 @@ impl PeerScore { } }; topic_score += p2 * topic_params.first_message_deliveries_weight; - dbg!(topic_score); // P3: mesh message deliveries if topic_stats.mesh_message_deliveries_active { @@ -243,9 +240,16 @@ impl PeerScore { - topic_stats.mesh_message_deliveries; let p3 = deficit * deficit; topic_score += p3 * topic_params.mesh_message_deliveries_weight; + debug!( + "The peer {} has a mesh message delivieries deficit of {} in topic\ + {} and will get penalized by {}", + peer_id, + deficit, + topic, + p3 * topic_params.mesh_message_deliveries_weight + ); } } - dbg!(topic_score); // P3b: // NOTE: the weight of P3b is negative (validated in TopicScoreParams.validate), so this detracts. @@ -257,11 +261,9 @@ impl PeerScore { let p4 = topic_stats.invalid_message_deliveries * topic_stats.invalid_message_deliveries; topic_score += p4 * topic_params.invalid_message_deliveries_weight; - dbg!(topic_score); // update score, mixing with topic weight score += topic_score * topic_params.topic_weight; - dbg!(topic_score); } } @@ -269,8 +271,6 @@ impl PeerScore { if self.params.topic_score_cap > 0f64 && score > self.params.topic_score_cap { score = self.params.topic_score_cap; } - dbg!("after"); - dbg!(score); // P5: application-specific score let p5 = peer_stats.application_score; @@ -290,6 +290,11 @@ impl PeerScore { if (peers_in_ip as f64) > self.params.ip_colocation_factor_threshold { let surplus = (peers_in_ip as f64) - self.params.ip_colocation_factor_threshold; let p6 = surplus * surplus; + debug!( + "The peer {} gets penalized because of too many peers with the ip {}. \ + The surplus is {}. ", + peer_id, ip, surplus + ); score += p6 * self.params.ip_colocation_factor_weight; } } @@ -303,6 +308,10 @@ impl PeerScore { pub fn add_penalty(&mut self, peer_id: &PeerId, count: usize) { if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) { + debug!( + "Behavioral penalty for peer {}, count = {}.", + peer_id, count + ); peer_stats.behaviour_penalty += count as f64; } } @@ -630,6 +639,11 @@ impl PeerScore { if let Some(topic_stats) = peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params) { + debug!( + "Peer {} delivered an invalid messag in topic {} and gets penalized \ + for it", + peer_id, topic_hash + ); topic_stats.invalid_message_deliveries += 1f64; } } From 56f42473c92ca5809830647c66a62c8475c1d5f4 Mon Sep 17 00:00:00 2001 From: blacktemplar Date: Wed, 12 Aug 2020 11:38:09 +0200 Subject: [PATCH 02/11] export MessageAcceptance and rename validate_message to report_message_validation_result to also signal that this message should get called in case of invalid messages --- protocols/gossipsub/src/behaviour.rs | 9 +++++---- protocols/gossipsub/src/behaviour/tests.rs | 16 ++++++++-------- protocols/gossipsub/src/lib.rs | 2 +- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index f3371bffba4..b3a8da1edef 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -686,9 +686,10 @@ impl Gossipsub { } /// This function should be called when `config.validate_messages()` is `true` after the - /// message got validated by the caller. Messages are stored in the ['Memcache'] and validation - /// is expected to be fast enough that the messages should still exist in the cache.There are - /// three possible validation outcomes and the outcome is given in acceptance. + /// message got validated by the caller. Messages are stored in the + /// ['Memcache'] and validation is expected to be fast enough that the messages should still + /// exist in the cache. There are three possible validation outcomes and the outcome is given + /// in acceptance. /// /// If acceptance = Accept the message will get propagated to the network. The /// `propagation_source` parameter indicates who the message was received by and will not @@ -704,7 +705,7 @@ impl Gossipsub { /// in the cache anymore. /// /// This should only be called once per message. - pub fn validate_message( + pub fn report_message_validation_result( &mut self, message_id: &MessageId, propagation_source: &PeerId, diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index c2515b3ed4a..1d5efc7ab53 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -3186,7 +3186,7 @@ mod tests { assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0); //message m1 gets validated - gs.validate_message(&id(&m1), &peers[0], MessageAcceptance::Accept); + gs.report_message_validation_result(&id(&m1), &peers[0], MessageAcceptance::Accept); assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0); } @@ -3348,7 +3348,7 @@ mod tests { assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0); //message m1 gets ignored - gs.validate_message( + gs.report_message_validation_result( &(config.message_id_fn())(&m1), &peers[0], MessageAcceptance::Ignore, @@ -3404,7 +3404,7 @@ mod tests { assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0); //message m1 gets rejected - gs.validate_message( + gs.report_message_validation_result( &(config.message_id_fn())(&m1), &peers[0], MessageAcceptance::Reject, @@ -3467,7 +3467,7 @@ mod tests { assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[1]), 0.0); //message m1 gets rejected - gs.validate_message( + gs.report_message_validation_result( &(config.message_id_fn())(&m1), &peers[0], MessageAcceptance::Reject, @@ -3534,17 +3534,17 @@ mod tests { assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0); //messages gets rejected - gs.validate_message( + gs.report_message_validation_result( &(config.message_id_fn())(&m1), &peers[0], MessageAcceptance::Reject, ); - gs.validate_message( + gs.report_message_validation_result( &(config.message_id_fn())(&m2), &peers[0], MessageAcceptance::Reject, ); - gs.validate_message( + gs.report_message_validation_result( &(config.message_id_fn())(&m3), &peers[0], MessageAcceptance::Reject, @@ -3604,7 +3604,7 @@ mod tests { assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0); //message m1 gets rejected - gs.validate_message( + gs.report_message_validation_result( &(config.message_id_fn())(&m1), &peers[0], MessageAcceptance::Reject, diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index 7454783bded..0b5fab4a6b5 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -152,7 +152,7 @@ mod rpc_proto { include!(concat!(env!("OUT_DIR"), "/gossipsub.pb.rs")); } -pub use self::behaviour::{Gossipsub, GossipsubEvent, MessageAuthenticity}; +pub use self::behaviour::{Gossipsub, GossipsubEvent, MessageAcceptance, MessageAuthenticity}; pub use self::config::{GossipsubConfig, GossipsubConfigBuilder, ValidationMode}; pub use self::peer_score::{ score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds, From 3dcf35888d05fcfe5ac35e077b8d5c293df2c6ad Mon Sep 17 00:00:00 2001 From: blacktemplar Date: Wed, 12 Aug 2020 11:53:01 +0200 Subject: [PATCH 03/11] fix double reject_message call --- protocols/gossipsub/src/behaviour.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index b3a8da1edef..badf892eb06 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -739,9 +739,6 @@ impl Gossipsub { message_id, reject_reason, ); - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.reject_message(propagation_source, &message, reject_reason); - } true } else { warn!("Rejected message not in cache. Message Id: {}", message_id); From 6dc805eb65b2be92867d9917709e325028c3a9c1 Mon Sep 17 00:00:00 2001 From: blacktemplar Date: Wed, 12 Aug 2020 12:39:03 +0200 Subject: [PATCH 04/11] gossip promises are fulfilled already on receiving the message without validation --- protocols/gossipsub/src/behaviour.rs | 48 +++++++++------------------- 1 file changed, 15 insertions(+), 33 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index badf892eb06..0e1cdb39deb 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -731,14 +731,10 @@ impl Gossipsub { }; if let Some(message) = self.mcache.remove(message_id) { - //tell peer_score and gossip promises about reject - Self::reject_message( - &mut self.peer_score, - propagation_source, - &message, - message_id, - reject_reason, - ); + //tell peer_score about reject + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.reject_message(propagation_source, &message, reject_reason); + } true } else { warn!("Rejected message not in cache. Message Id: {}", message_id); @@ -1313,20 +1309,6 @@ impl Gossipsub { } } - /// informs peer score and gossip_promises about a rejected message - fn reject_message( - peer_score: &mut Option<(PeerScore, PeerScoreThresholds, Interval, GossipPromises)>, - from: &PeerId, - msg: &GossipsubMessage, - id: &MessageId, - reason: RejectReason, - ) { - if let Some((peer_score, _, _, gossip_promises)) = peer_score { - peer_score.reject_message(from, &msg, reason); - gossip_promises.reject_message(id, &reason); - } - } - /// Handles a newly received GossipsubMessage. /// Forwards the message to all peers in the mesh. fn handle_received_message(&mut self, mut msg: GossipsubMessage, propagation_source: &PeerId) { @@ -1351,13 +1333,10 @@ impl Gossipsub { "Dropping message claiming to be from self but forwarded from {:?}", propagation_source ); - Self::reject_message( - &mut self.peer_score, - propagation_source, - &msg, - &msg_id, - RejectReason::SelfOrigin, - ); + if let Some((peer_score, _, _, gossip_promises)) = &mut self.peer_score { + peer_score.reject_message(propagation_source, &msg, RejectReason::SelfOrigin); + gossip_promises.reject_message(&msg_id, &RejectReason::SelfOrigin); + } return; } } @@ -1372,8 +1351,10 @@ impl Gossipsub { } //tells score that message arrived (but is maybe not fully validated yet) - if let Some((peer_score, ..)) = &mut self.peer_score { + //Consider message as delivered for gossip promises + if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { peer_score.validate_message(propagation_source, &msg); + gossip_promises.deliver_message(&msg_id); } self.mcache.put(msg.clone()); @@ -2031,12 +2012,11 @@ impl Gossipsub { fn forward_msg(&mut self, message: GossipsubMessage, source: Option<&PeerId>) -> bool { let msg_id = (self.config.message_id_fn())(&message); - //message is fully validated, inform peer_score and gossip promises + //message is fully validated inform peer_score if let Some((peer_score, _, _, gossip_promises)) = &mut self.peer_score { if let Some(peer) = source { peer_score.deliver_message(peer, &message); } - gossip_promises.deliver_message(&msg_id); } debug!("Forwarding message: {:?}", msg_id); @@ -2474,10 +2454,12 @@ impl NetworkBehaviour for Gossipsub { invalid_messages, } => { // Handle any invalid messages from this peer - if let Some((peer_score, ..)) = &mut self.peer_score { + if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { + let mut id_fn = self.config.message_id_fn(); for (_message, validation_error) in invalid_messages { let reason = RejectReason::ProtocolValidationError(validation_error); peer_score.reject_message(&propagation_source, &_message, reason); + gossip_promises.reject_message(&id_fn(&_message), &reason); } } From 3c96d51fe6633a840d532ff201d32c66f8addcd0 Mon Sep 17 00:00:00 2001 From: blacktemplar Date: Wed, 12 Aug 2020 12:39:03 +0200 Subject: [PATCH 05/11] derive debug for MessageAcceptance --- protocols/gossipsub/src/behaviour.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 0e1cdb39deb..bbdd54fad38 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -344,6 +344,7 @@ impl BackoffStorage { } } +#[derive(Debug)] pub enum MessageAcceptance { /// The message is considered valid, and it should be delivered and forwarded to the network Accept, From 1f9b9630a7c87769d8d7903bb603a1ab5096c6da Mon Sep 17 00:00:00 2001 From: blacktemplar Date: Wed, 12 Aug 2020 13:34:29 +0200 Subject: [PATCH 06/11] add helper method to get config builder from existing config --- protocols/gossipsub/src/config.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index aeba49ef853..462fc59e6f8 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -457,6 +457,12 @@ impl Default for GossipsubConfigBuilder { } } +impl From for GossipsubConfigBuilder { + fn from(config: GossipsubConfig) -> Self { + GossipsubConfigBuilder { config } + } +} + impl GossipsubConfigBuilder { // set default values pub fn new() -> GossipsubConfigBuilder { From cd9c22236486c19281d8f3bb662730f8db779847 Mon Sep 17 00:00:00 2001 From: blacktemplar Date: Thu, 13 Aug 2020 08:17:16 +0200 Subject: [PATCH 07/11] allow adding/changing TopicScoreParams during runtime --- protocols/gossipsub/src/behaviour.rs | 8 +++++++- protocols/gossipsub/src/peer_score/mod.rs | 4 ++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index bbdd54fad38..2c6f5e09e58 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -55,7 +55,6 @@ use crate::handler::{GossipsubHandler, HandlerEvent}; use crate::mcache::MessageCache; use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason}; use crate::protocol::SIGNING_PREFIX; -use crate::rpc_proto; use crate::time_cache::DuplicateCache; use crate::topic::{Hasher, Topic, TopicHash}; use crate::types::{ @@ -63,6 +62,7 @@ use crate::types::{ MessageId, PeerInfo, }; use crate::types::{GossipsubRpc, PeerKind}; +use crate::{rpc_proto, TopicScoreParams}; use std::cmp::Ordering::Equal; mod tests; @@ -777,6 +777,12 @@ impl Gossipsub { Ok(()) } + pub fn set_topic_params(&mut self, topic_hash: TopicHash, params: TopicScoreParams) { + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.set_topic_params(topic_hash, params); + } + } + /// Sets the application specific score for a peer. Returns true if scoring is active and /// the peer is connected or if the score of the peer is not yet expired, false otherwise. pub fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool { diff --git a/protocols/gossipsub/src/peer_score/mod.rs b/protocols/gossipsub/src/peer_score/mod.rs index 9fa82c3c18f..3697b2aae75 100644 --- a/protocols/gossipsub/src/peer_score/mod.rs +++ b/protocols/gossipsub/src/peer_score/mod.rs @@ -631,6 +631,10 @@ impl PeerScore { } } + pub fn set_topic_params(&mut self, topic_hash: TopicHash, params: TopicScoreParams) { + self.params.topics.insert(topic_hash, params); + } + /// Increments the "invalid message deliveries" counter for all scored topics the message /// is published in. fn mark_invalid_message_delivery(&mut self, peer_id: &PeerId, msg: &GossipsubMessage) { From 49dba5374b88dcb8fb352470446ac955201182d1 Mon Sep 17 00:00:00 2001 From: blacktemplar Date: Thu, 13 Aug 2020 11:20:53 +0200 Subject: [PATCH 08/11] more debug output for messages from self --- protocols/gossipsub/src/behaviour.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 2c6f5e09e58..da2f17b7dd6 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1337,8 +1337,8 @@ impl Gossipsub { if let Some(own_id) = self.publish_config.get_own_id() { if own_id != propagation_source && msg.source.as_ref().map_or(false, |s| s == own_id) { debug!( - "Dropping message claiming to be from self but forwarded from {:?}", - propagation_source + "Dropping message {:?} claiming to be from self but forwarded from {:?}", + msg_id, propagation_source ); if let Some((peer_score, _, _, gossip_promises)) = &mut self.peer_score { peer_score.reject_message(propagation_source, &msg, RejectReason::SelfOrigin); From 69cc78408d97c6f7d104779121d5a34210bb4e99 Mon Sep 17 00:00:00 2001 From: blacktemplar Date: Thu, 13 Aug 2020 13:20:26 +0200 Subject: [PATCH 09/11] fixes incompatibility with anonymous PeerId in lighthouse --- protocols/gossipsub/src/behaviour.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index da2f17b7dd6..24c7a9a046c 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1335,7 +1335,12 @@ impl Gossipsub { // reject messages claiming to be from ourselves but not locally published if let Some(own_id) = self.publish_config.get_own_id() { - if own_id != propagation_source && msg.source.as_ref().map_or(false, |s| s == own_id) { + + //TODO remove this "hack" as soon as lighthouse uses Anonymous instead of this fixed + // PeerId. + let lighthouse_anonymous_id = PeerId::from_bytes(vec![0, 1, 0]).expect("Valid peer id"); + if own_id != &lighthouse_anonymous_id && own_id != propagation_source && + msg.source.as_ref().map_or(false, |s| s == own_id) { debug!( "Dropping message {:?} claiming to be from self but forwarded from {:?}", msg_id, propagation_source @@ -2044,7 +2049,7 @@ impl Gossipsub { //add explicit peers for p in &self.explicit_peers { if let Some(topics) = self.peer_topics.get(p) { - if message.topics.iter().any(|t| topics.contains(t)) { + if Some(p) != source && message.topics.iter().any(|t| topics.contains(t)) { recipient_peers.insert(p.clone()); } } From 7115fced6667bb9770dc33bc0babd795e1141894 Mon Sep 17 00:00:00 2001 From: blacktemplar Date: Thu, 13 Aug 2020 14:07:41 +0200 Subject: [PATCH 10/11] cargo fmt --- protocols/gossipsub/src/behaviour.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 24c7a9a046c..58946806a42 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1335,12 +1335,13 @@ impl Gossipsub { // reject messages claiming to be from ourselves but not locally published if let Some(own_id) = self.publish_config.get_own_id() { - //TODO remove this "hack" as soon as lighthouse uses Anonymous instead of this fixed // PeerId. let lighthouse_anonymous_id = PeerId::from_bytes(vec![0, 1, 0]).expect("Valid peer id"); - if own_id != &lighthouse_anonymous_id && own_id != propagation_source && - msg.source.as_ref().map_or(false, |s| s == own_id) { + if own_id != &lighthouse_anonymous_id + && own_id != propagation_source + && msg.source.as_ref().map_or(false, |s| s == own_id) + { debug!( "Dropping message {:?} claiming to be from self but forwarded from {:?}", msg_id, propagation_source From bf9154bf2f72f89b7787c6ef097ea2d3adfa1d7e Mon Sep 17 00:00:00 2001 From: blacktemplar Date: Thu, 13 Aug 2020 14:50:59 +0200 Subject: [PATCH 11/11] more debug output for broken promises --- protocols/gossipsub/src/gossip_promises.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/protocols/gossipsub/src/gossip_promises.rs b/protocols/gossipsub/src/gossip_promises.rs index b692ba5cc41..4419c04f4f2 100644 --- a/protocols/gossipsub/src/gossip_promises.rs +++ b/protocols/gossipsub/src/gossip_promises.rs @@ -2,6 +2,7 @@ use crate::error::ValidationError; use crate::peer_score::RejectReason; use crate::MessageId; use libp2p_core::PeerId; +use log::debug; use rand::seq::SliceRandom; use rand::thread_rng; use std::collections::HashMap; @@ -56,11 +57,15 @@ impl GossipPromises { pub fn get_broken_promises(&mut self) -> HashMap { let now = Instant::now(); let mut result = HashMap::new(); - self.promises.retain(|_, peers| { + self.promises.retain(|msg, peers| { peers.retain(|peer_id, expires| { if *expires < now { let count = result.entry(peer_id.clone()).or_insert(0); *count += 1; + debug!( + "The peer {} broke the promise to deliver message {} in time!", + peer_id, msg + ); false } else { true