diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 2a3a13ea6e7..fff64f4cdf9 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -3272,7 +3272,19 @@ where _: &Multiaddr, _: &Multiaddr, ) -> Result, ConnectionDenied> { - Ok(Handler::new(self.config.protocol_config())) + Ok(Handler::new( + self.config.protocol_config(), + self.metrics + .as_ref() + .expect("to be enabled") + .messages_added_to_queue + .clone(), + self.metrics + .as_ref() + .expect("to be enabled") + .messages_removed_from_queue + .clone(), + )) } fn handle_established_outbound_connection( @@ -3282,7 +3294,19 @@ where _: &Multiaddr, _: Endpoint, ) -> Result, ConnectionDenied> { - Ok(Handler::new(self.config.protocol_config())) + Ok(Handler::new( + self.config.protocol_config(), + self.metrics + .as_ref() + .expect("to be enabled") + .messages_added_to_queue + .clone(), + self.metrics + .as_ref() + .expect("to be enabled") + .messages_removed_from_queue + .clone(), + )) } fn on_connection_handler_event( diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 44258bb5394..730664c58bd 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -33,6 +33,7 @@ use libp2p_swarm::handler::{ FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol, }; use libp2p_swarm::Stream; +use prometheus_client::metrics::counter::Counter; use smallvec::SmallVec; use std::{ pin::Pin, @@ -120,6 +121,9 @@ pub struct EnabledHandler { /// Keeps track of whether this connection is for a peer in the mesh. This is used to make /// decisions about the keep alive state for this connection. in_mesh: bool, + + messages_added_to_queue: Counter, + messages_removed_from_queue: Counter, } pub enum DisabledHandler { @@ -159,7 +163,11 @@ enum OutboundSubstreamState { impl Handler { /// Builds a new [`Handler`]. - pub fn new(protocol_config: ProtocolConfig) -> Self { + pub fn new( + protocol_config: ProtocolConfig, + messages_added_to_queue: Counter, + messages_removed_from_queue: Counter, + ) -> Self { Handler::Enabled(EnabledHandler { listen_protocol: protocol_config, inbound_substream: None, @@ -172,6 +180,8 @@ impl Handler { peer_kind_sent: false, last_io_activity: Instant::now(), in_mesh: false, + messages_added_to_queue, + messages_removed_from_queue, }) } } @@ -316,6 +326,7 @@ impl EnabledHandler { // outbound idle state Some(OutboundSubstreamState::WaitingOutput(substream)) => { if let Some(message) = self.send_queue.pop() { + self.messages_removed_from_queue.inc(); self.send_queue.shrink_to_fit(); self.outbound_substream = Some(OutboundSubstreamState::PendingSend(substream, message)); @@ -409,7 +420,10 @@ impl ConnectionHandler for Handler { fn on_behaviour_event(&mut self, message: HandlerIn) { match self { Handler::Enabled(handler) => match message { - HandlerIn::Message(m) => handler.send_queue.push(m), + HandlerIn::Message(m) => { + handler.send_queue.push(m); + handler.messages_added_to_queue.inc(); + } HandlerIn::JoinedMesh => { handler.in_mesh = true; } diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs index e044ca67e71..959ba27cba5 100644 --- a/protocols/gossipsub/src/metrics.rs +++ b/protocols/gossipsub/src/metrics.rs @@ -174,6 +174,9 @@ pub(crate) struct Metrics { /// The number of times we have decided that an IWANT control message is required for this /// topic. A very high metric might indicate an underperforming network. topic_iwant_msgs: Family, + + pub messages_added_to_queue: Counter, + pub messages_removed_from_queue: Counter, } impl Metrics { @@ -301,6 +304,24 @@ impl Metrics { ); metric }; + let messages_added_to_queue = { + let metric = Counter::default(); + registry.register( + "messages_added_to_queue", + "TODO", + metric.clone(), + ); + metric + }; + let messages_removed_from_queue = { + let metric = Counter::default(); + registry.register( + "messages_removed_from_queue", + "TODO", + metric.clone(), + ); + metric + }; Self { max_topics, @@ -327,6 +348,8 @@ impl Metrics { heartbeat_duration, memcache_misses, topic_iwant_msgs, + messages_added_to_queue, + messages_removed_from_queue, } }