Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

misc/metrics: Track # connected nodes supporting specific protocol #2734

Merged
merged 16 commits into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions misc/metrics/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@

- Update to `libp2p-kad` `v0.39.0`.

- Track number of connected nodes supporting a specific protocol via the identify protocol. See [PR 2734].

[PR 2734]: https://github.com/libp2p/rust-libp2p/pull/2734/

# 0.7.0

- Update to `libp2p-core` `v0.34.0`.
Expand Down
5 changes: 2 additions & 3 deletions misc/metrics/src/dcutr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,9 @@ impl From<&libp2p_dcutr::behaviour::Event> for EventType {
}
}

impl super::Recorder<libp2p_dcutr::behaviour::Event> for super::Metrics {
impl super::Recorder<libp2p_dcutr::behaviour::Event> for Metrics {
fn record(&self, event: &libp2p_dcutr::behaviour::Event) {
self.dcutr
.events
self.events
.get_or_create(&EventLabels {
event: event.into(),
})
Expand Down
4 changes: 2 additions & 2 deletions misc/metrics/src/gossipsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ impl Metrics {
}
}

impl super::Recorder<libp2p_gossipsub::GossipsubEvent> for super::Metrics {
impl super::Recorder<libp2p_gossipsub::GossipsubEvent> for Metrics {
fn record(&self, event: &libp2p_gossipsub::GossipsubEvent) {
if let libp2p_gossipsub::GossipsubEvent::Message { .. } = event {
self.gossipsub.messages.inc();
self.messages.inc();
}
}
}
145 changes: 135 additions & 10 deletions misc/metrics/src/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use libp2p_core::PeerId;
use prometheus_client::encoding::text::{EncodeMetric, Encoder};
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
use prometheus_client::metrics::MetricType;
use prometheus_client::registry::Registry;
use std::collections::HashMap;
use std::iter;
use std::sync::{Arc, Mutex};

pub struct Metrics {
protocols: Protocols,
error: Counter,
pushed: Counter,
received: Counter,
Expand All @@ -36,6 +42,15 @@ impl Metrics {
pub fn new(registry: &mut Registry) -> Self {
let sub_registry = registry.sub_registry_with_prefix("identify");

let protocols = Protocols::default();
sub_registry.register(
"protocols",
"Number of connected nodes supporting a specific protocol, with \
\"unrecognized\" for each peer supporting one or more unrecognized \
protocols",
Box::new(protocols.clone()),
);

let error = Counter::default();
sub_registry.register(
"errors",
Expand Down Expand Up @@ -86,6 +101,7 @@ impl Metrics {
);

Self {
protocols,
error,
pushed,
received,
Expand All @@ -96,27 +112,136 @@ impl Metrics {
}
}

impl super::Recorder<libp2p_identify::IdentifyEvent> for super::Metrics {
impl super::Recorder<libp2p_identify::IdentifyEvent> for Metrics {
fn record(&self, event: &libp2p_identify::IdentifyEvent) {
match event {
libp2p_identify::IdentifyEvent::Error { .. } => {
self.identify.error.inc();
self.error.inc();
}
libp2p_identify::IdentifyEvent::Pushed { .. } => {
self.identify.pushed.inc();
self.pushed.inc();
}
libp2p_identify::IdentifyEvent::Received { info, .. } => {
self.identify.received.inc();
self.identify
.received_info_protocols
libp2p_identify::IdentifyEvent::Received { peer_id, info, .. } => {
{
let mut protocols: Vec<String> = info
.protocols
.iter()
.filter(|p| {
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
let allowed_protocols: &[&[u8]] = &[
#[cfg(feature = "dcutr")]
libp2p_dcutr::PROTOCOL_NAME,
// #[cfg(feature = "gossipsub")]
// #[cfg(not(target_os = "unknown"))]
// TODO: Add Gossipsub protocol name
libp2p_identify::PROTOCOL_NAME,
libp2p_identify::PUSH_PROTOCOL_NAME,
#[cfg(feature = "kad")]
libp2p_kad::protocol::DEFAULT_PROTO_NAME,
#[cfg(feature = "ping")]
libp2p_ping::PROTOCOL_NAME,
#[cfg(feature = "relay")]
libp2p_relay::v2::STOP_PROTOCOL_NAME,
#[cfg(feature = "relay")]
libp2p_relay::v2::HOP_PROTOCOL_NAME,
];

allowed_protocols.contains(&p.as_bytes())
})
.cloned()
.collect();

// Signal via an additional label value that one or more
// protocols of the remote peer have not been recognized.
if protocols.len() < info.protocols.len() {
protocols.push("unrecognized".to_string());
}

protocols.sort_unstable();
protocols.dedup();
Comment on lines +153 to +160
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will flatten all unrecognized into 1, regardless of how many unrecognized protocols there are.

Would it make sense to increment a separate counter for the number of unrecognized protocols?

Copy link
Member Author

@mxinden mxinden Jul 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to increment a separate counter for the number of unrecognized protocols?

We would then need to track the number of unrecognized protocols per peer. On each new identify message, we would have to subtract the previous amount and add the new amount to the counter. I am not saying this is impossible, just adds a bit of complexity which I am not sure is worth the benefit. In other words, is it relevant for a user to know how many, potentially duplicate, unrecognized protocols all connected peers offer as a sum?

I was hoping for the addition to the # HELP text to resolve any confusion:

with "unrecognized" for each peer supporting one or more unrecognized protocols

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to increment a separate counter for the number of unrecognized protocols?

We would then need to track the number of unrecognized protocols per peer. On each new identify message, we would have to subtract the previous amount and add the new amount to the counter.

I think this would only apply if we would track it as a gauge?

A counter always increases anyway and you need to use functions like rate to see how it changes over time.

A spike in the unrecognized protocols rate could e.g. hint at a spam attack by a peer.

I don't feel strongly about it though, happy to go either way :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to increment a separate counter for the number of unrecognized protocols?

We would then need to track the number of unrecognized protocols per peer. On each new identify message, we would have to subtract the previous amount and add the new amount to the counter.

I think this would only apply if we would track it as a gauge?

A counter always increases anyway and you need to use functions like rate to see how it changes over time.

The rate on a counter would only tell how often you saw a unrecognized protocol, not how many unrecognized protocols connected peers offer. One could correlate this with the interval libp2p-identify requests a new identification, though that is (a) neither specified nor (b) solid with the push mechanism.

A spike in the unrecognized protocols rate could e.g. hint at a spam attack by a peer.

True, though I think we would want to prevent such attack before it happens. In addition, given that this attack would potentially bring down our Prometheus instance, who is going to alert us of the spam attack?

Copy link
Contributor

@thomaseizinger thomaseizinger Jul 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to increment a separate counter for the number of unrecognized protocols?

We would then need to track the number of unrecognized protocols per peer. On each new identify message, we would have to subtract the previous amount and add the new amount to the counter.

I think this would only apply if we would track it as a gauge?
A counter always increases anyway and you need to use functions like rate to see how it changes over time.

The rate on a counter would only tell how often you saw a unrecognized protocol, not how many unrecognized protocols connected peers offer. One could correlate this with the interval libp2p-identify requests a new identification, though that is (a) neither specified nor (b) solid with the push mechanism.

Assuming a stable set of peers, the rate of unrecognised protocols should be stable too? Identify pushes might throw a little spanner in here and there but overall, I'd expect it to be meaningful.

A spike in the unrecognized protocols rate could e.g. hint at a spam attack by a peer.

True, though I think we would want to prevent such attack before it happens. In addition, given that this attack would potentially bring down our Prometheus instance, who is going to alert us of the spam attack?

We are preventing the attack by only increasing a counter instead of creating a label per protocol. Surely prometheus can't be brought down just because a counter increases at a massive rate?

All I am saying is that increasing the counter by the number of unrecognised protocols should allow us to observe the attempt of an attack assuming that a non-malicious node will have a stable number of unrecognised protocols whereas an attack would likely max out the size limit of the incoming message with as many protocols as possible and thus increase the counter by more than ordinary behaviour.

It is not massively useful outside of this I think so happy to go either way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A spike in the unrecognized protocols rate could e.g. hint at a spam attack by a peer.

True, though I think we would want to prevent such attack before it happens. In addition, given that this attack would potentially bring down our Prometheus instance, who is going to alert us of the spam attack?

We are preventing the attack by only increasing a counter instead of creating a label per protocol. Surely prometheus can't be brought down just because a counter increases at a massive rate?

Oh error in reasoning on my end here. You are right. Sorry about that @thomaseizinger.

It is not massively useful outside of this I think so happy to go either way.

I will go without it.


self.protocols.add(*peer_id, protocols);
}

self.received.inc();
self.received_info_protocols
.observe(info.protocols.len() as f64);
self.identify
.received_info_listen_addrs
self.received_info_listen_addrs
.observe(info.listen_addrs.len() as f64);
}
libp2p_identify::IdentifyEvent::Sent { .. } => {
self.identify.sent.inc();
self.sent.inc();
}
}
}
}

impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleErr>> for Metrics {
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv, THandleErr>) {
if let libp2p_swarm::SwarmEvent::ConnectionClosed {
peer_id,
num_established,
..
} = event
{
if *num_established == 0 {
self.protocols.remove(*peer_id)
}
}
}
}

#[derive(Default, Clone)]
struct Protocols {
peers: Arc<Mutex<HashMap<PeerId, Vec<String>>>>,
}

impl Protocols {
fn add(&self, peer: PeerId, protocols: Vec<String>) {
self.peers
.lock()
.expect("Lock not to be poisoned")
.insert(peer, protocols);
}

fn remove(&self, peer: PeerId) {
self.peers
.lock()
.expect("Lock not to be poisoned")
.remove(&peer);
}
}

impl EncodeMetric for Protocols {
fn encode(&self, mut encoder: Encoder) -> Result<(), std::io::Error> {
let count_by_protocol = self
.peers
.lock()
.expect("Lock not to be poisoned")
.iter()
.fold(
HashMap::<String, u64>::default(),
|mut acc, (_, protocols)| {
for protocol in protocols {
let count = acc.entry(protocol.to_string()).or_default();
*count = *count + 1;
}
acc
},
);

for (protocol, count) in count_by_protocol {
encoder
.with_label_set(&("protocol", protocol))
.no_suffix()?
.no_bucket()?
.encode_value(count)?
.no_exemplar()?;
}

Ok(())
}

fn metric_type(&self) -> MetricType {
MetricType::Gauge
}
}
40 changes: 12 additions & 28 deletions misc/metrics/src/kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,62 +159,52 @@ impl Metrics {
}
}

impl super::Recorder<libp2p_kad::KademliaEvent> for super::Metrics {
impl super::Recorder<libp2p_kad::KademliaEvent> for Metrics {
fn record(&self, event: &libp2p_kad::KademliaEvent) {
match event {
libp2p_kad::KademliaEvent::OutboundQueryCompleted { result, stats, .. } => {
self.kad
.query_result_num_requests
self.query_result_num_requests
.get_or_create(&result.into())
.observe(stats.num_requests().into());
self.kad
.query_result_num_success
self.query_result_num_success
.get_or_create(&result.into())
.observe(stats.num_successes().into());
self.kad
.query_result_num_failure
self.query_result_num_failure
.get_or_create(&result.into())
.observe(stats.num_failures().into());
if let Some(duration) = stats.duration() {
self.kad
.query_result_duration
self.query_result_duration
.get_or_create(&result.into())
.observe(duration.as_secs_f64());
}

match result {
libp2p_kad::QueryResult::GetRecord(result) => match result {
Ok(ok) => self
.kad
.query_result_get_record_ok
.observe(ok.records.len() as f64),
Err(error) => {
self.kad
.query_result_get_record_error
self.query_result_get_record_error
.get_or_create(&error.into())
.inc();
}
},
libp2p_kad::QueryResult::GetClosestPeers(result) => match result {
Ok(ok) => self
.kad
.query_result_get_closest_peers_ok
.observe(ok.peers.len() as f64),
Err(error) => {
self.kad
.query_result_get_closest_peers_error
self.query_result_get_closest_peers_error
.get_or_create(&error.into())
.inc();
}
},
libp2p_kad::QueryResult::GetProviders(result) => match result {
Ok(ok) => self
.kad
.query_result_get_providers_ok
.observe(ok.providers.len() as f64),
Err(error) => {
self.kad
.query_result_get_providers_error
self.query_result_get_providers_error
.get_or_create(&error.into())
.inc();
}
Expand All @@ -230,16 +220,14 @@ impl super::Recorder<libp2p_kad::KademliaEvent> for super::Metrics {
} => {
let bucket = low.ilog2().unwrap_or(0);
if *is_new_peer {
self.kad
.routing_updated
self.routing_updated
.get_or_create(&RoutingUpdated {
action: RoutingAction::Added,
bucket,
})
.inc();
} else {
self.kad
.routing_updated
self.routing_updated
.get_or_create(&RoutingUpdated {
action: RoutingAction::Updated,
bucket,
Expand All @@ -248,8 +236,7 @@ impl super::Recorder<libp2p_kad::KademliaEvent> for super::Metrics {
}

if old_peer.is_some() {
self.kad
.routing_updated
self.routing_updated
.get_or_create(&RoutingUpdated {
action: RoutingAction::Evicted,
bucket,
Expand All @@ -259,10 +246,7 @@ impl super::Recorder<libp2p_kad::KademliaEvent> for super::Metrics {
}

libp2p_kad::KademliaEvent::InboundRequest { request } => {
self.kad
.inbound_requests
.get_or_create(&request.into())
.inc();
self.inbound_requests.get_or_create(&request.into()).inc();
}
_ => {}
}
Expand Down
52 changes: 52 additions & 0 deletions misc/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,55 @@ pub trait Recorder<Event> {
/// Record the given event.
fn record(&self, event: &Event);
}

#[cfg(feature = "dcutr")]
impl Recorder<libp2p_dcutr::behaviour::Event> for Metrics {
fn record(&self, event: &libp2p_dcutr::behaviour::Event) {
self.dcutr.record(event)
}
}

#[cfg(feature = "gossipsub")]
#[cfg(not(target_os = "unknown"))]
impl Recorder<libp2p_gossipsub::GossipsubEvent> for Metrics {
fn record(&self, event: &libp2p_gossipsub::GossipsubEvent) {
self.gossipsub.record(event)
}
}

#[cfg(feature = "identify")]
impl Recorder<libp2p_identify::IdentifyEvent> for Metrics {
fn record(&self, event: &libp2p_identify::IdentifyEvent) {
self.identify.record(event)
}
}

#[cfg(feature = "kad")]
impl Recorder<libp2p_kad::KademliaEvent> for Metrics {
fn record(&self, event: &libp2p_kad::KademliaEvent) {
self.kad.record(event)
}
}

#[cfg(feature = "ping")]
impl Recorder<libp2p_ping::PingEvent> for Metrics {
fn record(&self, event: &libp2p_ping::PingEvent) {
self.ping.record(event)
}
}

#[cfg(feature = "relay")]
impl Recorder<libp2p_relay::v2::relay::Event> for Metrics {
fn record(&self, event: &libp2p_relay::v2::relay::Event) {
self.relay.record(event)
}
}

impl<TBvEv, THandleErr> Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleErr>> for Metrics {
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv, THandleErr>) {
self.swarm.record(event);

#[cfg(feature = "identify")]
self.identify.record(event)
}
}
Loading