Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve libp2p connected peer metrics #4870

Merged
merged 4 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use sysinfo::{System, SystemExt};
use system_health::observe_system_health_bn;
use system_health::{observe_nat, observe_system_health_bn};
use task_spawner::{Priority, TaskSpawner};
use tokio::sync::{
mpsc::{Sender, UnboundedSender},
Expand Down Expand Up @@ -4047,13 +4047,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.then(|task_spawner: TaskSpawner<T::EthSpec>| {
task_spawner.blocking_json_task(Priority::P1, move || {
Ok(api_types::GenericResponse::from(
lighthouse_network::metrics::NAT_OPEN
.as_ref()
.map(|v| v.get())
.unwrap_or(0)
!= 0,
))
Ok(api_types::GenericResponse::from(observe_nat()))
})
});

Expand Down
8 changes: 5 additions & 3 deletions beacon_node/lighthouse_network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1011,11 +1011,13 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
}
Discv5Event::SocketUpdated(socket_addr) => {
info!(self.log, "Address updated"; "ip" => %socket_addr.ip(), "udp_port" => %socket_addr.port());
metrics::inc_counter(&metrics::ADDRESS_UPDATE_COUNT);
metrics::check_nat();
// We have SOCKET_UPDATED messages. This occurs when discovery has a majority of
// users reporting an external port and our ENR gets updated.
// Which means we are able to do NAT traversal.
metrics::set_gauge_vec(&metrics::NAT_OPEN, &["discv5"], 1);

// Discv5 will have updated our local ENR. We save the updated version
// to disk.

if (self.update_ports.tcp4 && socket_addr.is_ipv4())
|| (self.update_ports.tcp6 && socket_addr.is_ipv6())
{
Expand Down
68 changes: 11 additions & 57 deletions beacon_node/lighthouse_network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,17 @@ use std::sync::Arc;
pub use lighthouse_metrics::*;

lazy_static! {
pub static ref NAT_OPEN: Result<IntCounter> = try_create_int_counter(
pub static ref NAT_OPEN: Result<IntGaugeVec> = try_create_int_gauge_vec(
"nat_open",
"An estimate indicating if the local node is exposed to the internet."
"An estimate indicating if the local node is reachable from external nodes",
&["protocol"]
);
pub static ref ADDRESS_UPDATE_COUNT: Result<IntCounter> = try_create_int_counter(
"libp2p_address_update_total",
"Count of libp2p socked updated events (when our view of our IP address has changed)"
);
pub static ref PEERS_CONNECTED: Result<IntGauge> = try_create_int_gauge(
"libp2p_peers",
"Count of libp2p peers currently connected"
);

pub static ref TCP_PEERS_CONNECTED: Result<IntGauge> = try_create_int_gauge(
"libp2p_tcp_peers",
"Count of libp2p peers currently connected via TCP"
);

pub static ref QUIC_PEERS_CONNECTED: Result<IntGauge> = try_create_int_gauge(
"libp2p_quic_peers",
"Count of libp2p peers currently connected via QUIC"
);

pub static ref PEERS_CONNECTED: Result<IntGaugeVec> =
try_create_int_gauge_vec("libp2p_peers", "Count of libp2p peers currently connected", &["direction", "transport"]);
pub static ref PEER_CONNECT_EVENT_COUNT: Result<IntCounter> = try_create_int_counter(
"libp2p_peer_connect_event_total",
"Count of libp2p peer connect events (not the current number of connected peers)"
Expand All @@ -35,13 +23,10 @@ lazy_static! {
"libp2p_peer_disconnect_event_total",
"Count of libp2p peer disconnect events"
);
pub static ref DISCOVERY_SENT_BYTES: Result<IntGauge> = try_create_int_gauge(
"discovery_sent_bytes",
"The number of bytes sent in discovery"
);
pub static ref DISCOVERY_RECV_BYTES: Result<IntGauge> = try_create_int_gauge(
"discovery_recv_bytes",
"The number of bytes received in discovery"
pub static ref DISCOVERY_BYTES: Result<IntGaugeVec> = try_create_int_gauge_vec(
"discovery_bytes",
"The number of bytes sent and received in discovery",
&["direction"]
);
pub static ref DISCOVERY_QUEUE: Result<IntGauge> = try_create_int_gauge(
"discovery_queue_size",
Expand Down Expand Up @@ -138,17 +123,6 @@ lazy_static! {
&["type"]
);

/*
* Inbound/Outbound peers
*/
/// The number of peers that dialed us.
pub static ref NETWORK_INBOUND_PEERS: Result<IntGauge> =
try_create_int_gauge("network_inbound_peers","The number of peers that are currently connected that have dialed us.");

/// The number of peers that we dialed us.
pub static ref NETWORK_OUTBOUND_PEERS: Result<IntGauge> =
try_create_int_gauge("network_outbound_peers","The number of peers that are currently connected that we dialed.");

/*
* Peer Reporting
*/
Expand All @@ -159,33 +133,13 @@ lazy_static! {
);
}

/// Checks if we consider the NAT open.
///
/// Conditions for an open NAT:
/// 1. We have 1 or more SOCKET_UPDATED messages. This occurs when discovery has a majority of
/// users reporting an external port and our ENR gets updated.
/// 2. We have 0 SOCKET_UPDATED messages (can be true if the port was correct on boot), then we
/// rely on whether we have any inbound messages. If we have no socket update messages, but
/// manage to get at least one inbound peer, we are exposed correctly.
pub fn check_nat() {
// NAT is already deemed open.
if NAT_OPEN.as_ref().map(|v| v.get()).unwrap_or(0) != 0 {
return;
}
if ADDRESS_UPDATE_COUNT.as_ref().map(|v| v.get()).unwrap_or(0) != 0
|| NETWORK_INBOUND_PEERS.as_ref().map(|v| v.get()).unwrap_or(0) != 0_i64
{
inc_counter(&NAT_OPEN);
}
}

pub fn scrape_discovery_metrics() {
let metrics =
discv5::metrics::Metrics::from(discv5::Discv5::<discv5::DefaultProtocolId>::raw_metrics());
set_float_gauge(&DISCOVERY_REQS, metrics.unsolicited_requests_per_second);
set_gauge(&DISCOVERY_SESSIONS, metrics.active_sessions as i64);
set_gauge(&DISCOVERY_SENT_BYTES, metrics.bytes_sent as i64);
set_gauge(&DISCOVERY_RECV_BYTES, metrics.bytes_recv as i64);
set_gauge_vec(&DISCOVERY_BYTES, &["inbound"], metrics.bytes_recv as i64);
set_gauge_vec(&DISCOVERY_BYTES, &["outbound"], metrics.bytes_sent as i64);
}

/// Aggregated `BandwidthSinks` of tcp and quic transports
Expand Down
18 changes: 0 additions & 18 deletions beacon_node/lighthouse_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -714,29 +714,14 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
return;
}

let mut connected_peer_count = 0;
let mut inbound_connected_peers = 0;
let mut outbound_connected_peers = 0;
let mut clients_per_peer = HashMap::new();

for (_peer, peer_info) in self.network_globals.peers.read().connected_peers() {
connected_peer_count += 1;
if let PeerConnectionStatus::Connected { n_in, .. } = peer_info.connection_status() {
if *n_in > 0 {
inbound_connected_peers += 1;
} else {
outbound_connected_peers += 1;
}
}
*clients_per_peer
.entry(peer_info.client().kind.to_string())
.or_default() += 1;
}

metrics::set_gauge(&metrics::PEERS_CONNECTED, connected_peer_count);
metrics::set_gauge(&metrics::NETWORK_INBOUND_PEERS, inbound_connected_peers);
metrics::set_gauge(&metrics::NETWORK_OUTBOUND_PEERS, outbound_connected_peers);

for client_kind in ClientKind::iter() {
let value = clients_per_peer.get(&client_kind.to_string()).unwrap_or(&0);
metrics::set_gauge_vec(
Expand Down Expand Up @@ -841,11 +826,8 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// start a ping and status timer for the peer
self.status_peers.insert(*peer_id);

let connected_peers = self.network_globals.connected_peers() as i64;

// increment prometheus metrics
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);
metrics::set_gauge(&metrics::PEERS_CONNECTED, connected_peers);

true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
self.on_dial_failure(peer_id);
}
FromSwarm::ExternalAddrConfirmed(_) => {
// We have an external address confirmed, means we are able to do NAT traversal.
metrics::set_gauge_vec(&metrics::NAT_OPEN, &["libp2p"], 1);
// TODO: we likely want to check this against our assumed external tcp
// address
}
Expand Down Expand Up @@ -247,25 +249,25 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
self.events.push(PeerManagerEvent::MetaData(peer_id));
}

// Check NAT if metrics are enabled
if self.network_globals.local_enr.read().udp4().is_some() {
metrics::check_nat();
}

// increment prometheus metrics
if self.metrics_enabled {
let remote_addr = endpoint.get_remote_address();
let direction = if endpoint.is_dialer() {
"outbound"
} else {
"inbound"
};
match remote_addr.iter().find(|proto| {
matches!(
proto,
multiaddr::Protocol::QuicV1 | multiaddr::Protocol::Tcp(_)
)
}) {
Some(multiaddr::Protocol::QuicV1) => {
metrics::inc_gauge(&metrics::QUIC_PEERS_CONNECTED);
metrics::inc_gauge_vec(&metrics::PEERS_CONNECTED, &[direction, "quic"]);
}
Some(multiaddr::Protocol::Tcp(_)) => {
metrics::inc_gauge(&metrics::TCP_PEERS_CONNECTED);
metrics::inc_gauge_vec(&metrics::PEERS_CONNECTED, &[direction, "tcp"]);
}
Some(_) => unreachable!(),
None => {
Expand Down Expand Up @@ -343,17 +345,23 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
let remote_addr = endpoint.get_remote_address();
// Update the prometheus metrics
if self.metrics_enabled {
let direction = if endpoint.is_dialer() {
"outbound"
} else {
"inbound"
};

match remote_addr.iter().find(|proto| {
matches!(
proto,
multiaddr::Protocol::QuicV1 | multiaddr::Protocol::Tcp(_)
)
}) {
Some(multiaddr::Protocol::QuicV1) => {
metrics::dec_gauge(&metrics::QUIC_PEERS_CONNECTED);
metrics::dec_gauge_vec(&metrics::PEERS_CONNECTED, &[direction, "quic"]);
}
Some(multiaddr::Protocol::Tcp(_)) => {
metrics::dec_gauge(&metrics::TCP_PEERS_CONNECTED);
metrics::dec_gauge_vec(&metrics::PEERS_CONNECTED, &[direction, "tcp"]);
}
// If it's an unknown protocol we already logged when connection was established.
_ => {}
Expand Down
36 changes: 20 additions & 16 deletions beacon_node/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,22 +319,26 @@ lazy_static! {
}

pub fn update_bandwidth_metrics(bandwidth: &AggregatedBandwidthSinks) {
if let Some(tcp_in_bandwidth) = get_int_counter(&LIBP2P_BYTES, &["inbound", "tcp"]) {
tcp_in_bandwidth.reset();
tcp_in_bandwidth.inc_by(bandwidth.total_tcp_inbound());
}
if let Some(tcp_out_bandwidth) = get_int_counter(&LIBP2P_BYTES, &["outbound", "tcp"]) {
tcp_out_bandwidth.reset();
tcp_out_bandwidth.inc_by(bandwidth.total_tcp_outbound());
}
if let Some(quic_in_bandwidth) = get_int_counter(&LIBP2P_BYTES, &["inbound", "quic"]) {
quic_in_bandwidth.reset();
quic_in_bandwidth.inc_by(bandwidth.total_quic_inbound());
}
if let Some(quic_out_bandwidth) = get_int_counter(&LIBP2P_BYTES, &["outbound", "quic"]) {
quic_out_bandwidth.reset();
quic_out_bandwidth.inc_by(bandwidth.total_quic_outbound());
}
set_counter_vec_by(
&LIBP2P_BYTES,
&["inbound", "tcp"],
bandwidth.total_tcp_inbound(),
);
set_counter_vec_by(
&LIBP2P_BYTES,
&["outbound", "tcp"],
bandwidth.total_tcp_outbound(),
);
set_counter_vec_by(
&LIBP2P_BYTES,
&["inbound", "quic"],
bandwidth.total_quic_inbound(),
);
set_counter_vec_by(
&LIBP2P_BYTES,
&["outbound", "quic"],
bandwidth.total_quic_outbound(),
);
}

pub fn register_finality_update_error(error: &LightClientFinalityUpdateError) {
Expand Down
10 changes: 10 additions & 0 deletions common/lighthouse_metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,16 @@ pub fn inc_counter_vec(int_counter_vec: &Result<IntCounterVec>, name: &[&str]) {
}
}

/// Sets the `int_counter_vec` with the given `name` to the `amount`,
/// should only be called with `ammount`s equal or above the current value
/// as per Prometheus spec, the `counter` type should only go up.
pub fn set_counter_vec_by(int_counter_vec: &Result<IntCounterVec>, name: &[&str], amount: u64) {
if let Some(counter) = get_int_counter(int_counter_vec, name) {
counter.reset();
counter.inc_by(amount);
}
}

pub fn inc_counter_vec_by(int_counter_vec: &Result<IntCounterVec>, name: &[&str], amount: u64) {
if let Some(counter) = get_int_counter(int_counter_vec, name) {
counter.inc_by(amount);
Expand Down
25 changes: 20 additions & 5 deletions common/system_health/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,25 @@ pub fn observe_system_health_vc(
}
}

/// Observes if NAT traversal is possible.
pub fn observe_nat() -> bool {
let discv5_nat = lighthouse_network::metrics::get_int_gauge(
&lighthouse_network::metrics::NAT_OPEN,
&["discv5"],
)
.map(|g| g.get() == 1)
.unwrap_or_default();

let libp2p_nat = lighthouse_network::metrics::get_int_gauge(
&lighthouse_network::metrics::NAT_OPEN,
&["libp2p"],
)
.map(|g| g.get() == 1)
.unwrap_or_default();

discv5_nat && libp2p_nat
}

/// Observes the Beacon Node system health.
pub fn observe_system_health_bn<TSpec: EthSpec>(
sysinfo: Arc<RwLock<System>>,
Expand All @@ -223,11 +242,7 @@ pub fn observe_system_health_bn<TSpec: EthSpec>(
.unwrap_or_else(|| (String::from("None"), 0, 0));

// Determine if the NAT is open or not.
let nat_open = lighthouse_network::metrics::NAT_OPEN
.as_ref()
.map(|v| v.get())
.unwrap_or(0)
!= 0;
let nat_open = observe_nat();

SystemHealthBN {
system_health,
Expand Down