Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): Expose metrics friendly for dashboard #2804

Merged
merged 1 commit into from
Jun 11, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
551 changes: 279 additions & 272 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
@@ -515,11 +515,16 @@ impl Handler<Status> for ClientActor {
is_slashed,
})
.collect();

let protocol_version = self
.client
.runtime_adapter
.get_epoch_protocol_version(&head.epoch_id)
.map_err(|err| err.to_string())?;

let validator_account_id =
self.client.validator_signer.as_ref().map(|vs| vs.validator_id()).cloned();

Ok(StatusResponse {
version: self.client.config.version.clone(),
protocol_version,
@@ -534,6 +539,7 @@ impl Handler<Status> for ClientActor {
latest_block_time: from_timestamp(latest_block_time),
syncing: self.client.sync_status.is_syncing(),
},
validator_account_id,
})
}
}
9 changes: 9 additions & 0 deletions chain/client/src/info.rs
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ use log::info;
use sysinfo::{get_current_pid, set_open_files_limit, Pid, ProcessExt, System, SystemExt};

use near_chain_configs::ClientConfig;
use near_metrics::set_gauge;
use near_network::types::NetworkInfo;
use near_primitives::block::Tip;
use near_primitives::network::PeerId;
@@ -20,6 +21,7 @@ use near_primitives::validator_signer::ValidatorSigner;
use near_primitives::version::Version;
use near_telemetry::{telemetry, TelemetryActor};

use crate::metrics;
use crate::types::ShardSyncStatus;
use crate::SyncStatus;

@@ -111,6 +113,13 @@ impl InfoHelper {
Blue.bold().paint(format!("CPU: {:.0}%, Mem: {}", cpu_usage, pretty_bytes(memory_usage * 1024)))
);

set_gauge(&metrics::IS_VALIDATOR, is_validator as i64);
set_gauge(&metrics::RECEIVED_BYTES_PER_SECOND, network_info.received_bytes_per_sec as i64);
set_gauge(&metrics::SENT_BYTES_PER_SECOND, network_info.sent_bytes_per_sec as i64);
set_gauge(&metrics::BLOCKS_PER_MINUTE, (avg_bls * (60 as f64)) as i64);
set_gauge(&metrics::CPU_USAGE, cpu_usage as i64);
set_gauge(&metrics::MEMORY_USAGE, memory_usage as i64);

self.started = Instant::now();
self.num_blocks_processed = 0;
self.gas_used = 0;
18 changes: 17 additions & 1 deletion chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,24 @@
use near_metrics::{try_create_int_counter, IntCounter};
use near_metrics::{try_create_int_counter, try_create_int_gauge, IntCounter, IntGauge};

lazy_static! {
pub static ref BLOCK_PRODUCED_TOTAL: near_metrics::Result<IntCounter> = try_create_int_counter(
"block_produced_total",
"Total number of blocks produced since starting this node"
);
pub static ref IS_VALIDATOR: near_metrics::Result<IntGauge> =
try_create_int_gauge("is_validator", "Bool to denote if it is currently validating");
pub static ref RECEIVED_BYTES_PER_SECOND: near_metrics::Result<IntGauge> = try_create_int_gauge(
"received_bytes_per_second",
"Number of bytes per second received over the network overall"
);
pub static ref SENT_BYTES_PER_SECOND: near_metrics::Result<IntGauge> = try_create_int_gauge(
"sent_bytes_per_second",
"Number of bytes per second sent over the network overall"
);
pub static ref BLOCKS_PER_MINUTE: near_metrics::Result<IntGauge> =
try_create_int_gauge("blocks_per_minute", "Blocks produced per minute");
pub static ref CPU_USAGE: near_metrics::Result<IntGauge> =
try_create_int_gauge("cpu_usage", "Percent of CPU usage");
pub static ref MEMORY_USAGE: near_metrics::Result<IntGauge> =
try_create_int_gauge("memory_usage", "Amount of RAM memory usage");
}
1 change: 1 addition & 0 deletions chain/network/Cargo.toml
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ rand = "0.7"
byteorder = "1.2"
lazy_static = "1.4"
tracing = "0.1.13"
strum = { version = "0.18", features = ["derive"] }

borsh = "0.6.2"
cached = "0.12"
141 changes: 74 additions & 67 deletions chain/network/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,10 @@
use near_metrics::{try_create_int_counter, try_create_int_gauge, IntCounter, IntGauge};

macro_rules! type_messages {
($name_counter:ident, $name_bytes:ident) => {
lazy_static! {
pub static ref $name_counter: near_metrics::Result<IntCounter> = try_create_int_counter(
stringify!($name_counter),
concat!("Peer Message ", stringify!($name_counter)),
);
pub static ref $name_bytes: near_metrics::Result<IntCounter> = try_create_int_counter(
stringify!($name_bytes),
concat!("Peer Message ", stringify!($name_bytes)),
);
}
};
}
use crate::types::{PeerMessage, RoutedMessageBody};
use near_metrics::{
inc_counter_by_opt, inc_counter_opt, try_create_histogram, try_create_int_counter,
try_create_int_gauge, Histogram, IntCounter, IntGauge,
};
use std::collections::HashMap;
use strum::VariantNames;

lazy_static! {
pub static ref PEER_CONNECTIONS_TOTAL: near_metrics::Result<IntGauge> =
@@ -47,11 +38,14 @@ lazy_static! {
"routing_table_recalculations",
"Number of times routing table have been recalculated from scratch"
);
pub static ref ROUTING_TABLE_RECALCULATION_MILLISECONDS: near_metrics::Result<IntGauge> =
try_create_int_gauge(
"routing_table_recalculation_milliseconds",


pub static ref ROUTING_TABLE_RECALCULATION_HISTOGRAM: near_metrics::Result<Histogram> =
try_create_histogram(
"routing_table_recalculation_histogram",
"Time spent recalculating routing table"
);

pub static ref EDGE_UPDATES: near_metrics::Result<IntCounter> =
try_create_int_counter(
"edge_updates",
@@ -90,51 +84,64 @@ lazy_static! {
pub static ref RECEIVED_INFO_ABOUT_ITSELF: near_metrics::Result<IntCounter> = try_create_int_counter("received_info_about_itself", "Number of times a peer tried to connect to itself");
}

type_messages!(HANDSHAKE_RECEIVED_TOTAL, HANDSHAKE_RECEIVED_BYTES);
type_messages!(HANDSHAKE_FAILURE_RECEIVED_TOTAL, HANDSHAKE_FAILURE_RECEIVED_BYTES);
type_messages!(SYNC_RECEIVED_TOTAL, SYNC_RECEIVED_BYTES);
type_messages!(REQUEST_UPDATE_NONCE_RECEIVED_TOTAL, REQUEST_UPDATE_NONCE_RECEIVED_BYTES);
type_messages!(RESPONSE_UPDATE_NONCE_RECEIVED_TOTAL, RESPONSE_UPDATE_NONCE_RECEIVED_BYTES);
type_messages!(LAST_EDGE_RECEIVED_TOTAL, LAST_EDGE_RECEIVED_BYTES);
type_messages!(PEERS_REQUEST_RECEIVED_TOTAL, PEERS_REQUEST_RECEIVED_BYTES);
type_messages!(PEERS_RESPONSE_RECEIVED_TOTAL, PEERS_RESPONSE_RECEIVED_BYTES);
type_messages!(BLOCK_HEADERS_REQUEST_RECEIVED_TOTAL, BLOCK_HEADERS_REQUEST_RECEIVED_BYTES);
type_messages!(BLOCK_HEADERS_RECEIVED_TOTAL, BLOCK_HEADERS_RECEIVED_BYTES);
type_messages!(BLOCK_HEADER_ANNOUNCE_RECEIVED_TOTAL, BLOCK_HEADER_ANNOUNCE_RECEIVED_BYTES);
type_messages!(BLOCK_REQUEST_RECEIVED_TOTAL, BLOCK_REQUEST_RECEIVED_BYTES);
type_messages!(BLOCK_RECEIVED_TOTAL, BLOCK_RECEIVED_BYTES);
type_messages!(TRANSACTION_RECEIVED_TOTAL, TRANSACTION_RECEIVED_BYTES);
type_messages!(ROUTED_STATE_REQUEST_RECEIVED_TOTAL, ROUTED_STATE_REQUEST_RECEIVED_BYTES);
type_messages!(ROUTED_STATE_RESPONSE_RECEIVED_TOTAL, ROUTED_STATE_RESPONSE_RECEIVED_BYTES);
type_messages!(ROUTED_BLOCK_APPROVAL_RECEIVED_TOTAL, ROUTED_BLOCK_APPROVAL_RECEIVED_BYTES);
type_messages!(ROUTED_FORWARD_TX_RECEIVED_TOTAL, ROUTED_FORWARD_TX_RECEIVED_BYTES);
type_messages!(ROUTED_TX_STATUS_REQUEST_RECEIVED_TOTAL, ROUTED_TX_STATUS_REQUEST_RECEIVED_BYTES);
type_messages!(ROUTED_TX_STATUS_RESPONSE_RECEIVED_TOTAL, ROUTED_TX_STATUS_RESPONSE_RECEIVED_BYTES);
type_messages!(ROUTED_QUERY_REQUEST_RECEIVED_TOTAL, ROUTED_QUERY_REQUEST_RECEIVED_BYTES);
type_messages!(ROUTED_QUERY_RESPONSE_RECEIVED_TOTAL, ROUTED_QUERY_RESPONSE_RECEIVED_BYTES);
type_messages!(
ROUTED_RECEIPT_OUTCOME_REQUEST_RECEIVED_TOTAL,
ROUTED_RECEIPT_OUTCOME_REQUEST_RECEIVED_BYTES
);
type_messages!(
ROUTED_RECEIPT_OUTCOME_RESPONSE_RECEIVED_TOTAL,
ROUTED_RECEIPT_OUTCOME_RESPONSE_RECEIVED_BYTES
);
type_messages!(
ROUTED_STATE_REQUEST_HEADER_RECEIVED_TOTAL,
ROUTED_STATE_REQUEST_HEADER_RECEIVED_BYTES
);
type_messages!(ROUTED_STATE_REQUEST_PART_RECEIVED_TOTAL, ROUTED_STATE_REQUEST_PART_RECEIVED_BYTES);
type_messages!(
ROUTED_PARTIAL_CHUNK_REQUEST_RECEIVED_TOTAL,
ROUTED_PARTIAL_CHUNK_REQUEST_RECEIVED_BYTES
);
type_messages!(
ROUTED_PARTIAL_CHUNK_RESPONSE_RECEIVED_TOTAL,
ROUTED_PARTIAL_CHUNK_RESPONSE_RECEIVED_BYTES
);
type_messages!(ROUTED_PARTIAL_CHUNK_RECEIVED_TOTAL, ROUTED_PARTIAL_CHUNK_RECEIVED_BYTES);
type_messages!(ROUTED_PING_RECEIVED_TOTAL, ROUTED_PING_RECEIVED_BYTES);
type_messages!(ROUTED_PONG_RECEIVED_TOTAL, ROUTED_PONG_RECEIVED_BYTES);
type_messages!(DISCONNECT_RECEIVED_TOTAL, DISCONNECT_RECEIVED_BYTES);
type_messages!(CHALLENGE_RECEIVED_TOTAL, CHALLENGE_RECEIVED_BYTES);
#[derive(Clone)]
pub struct NetworkMetrics {
pub peer_messages: HashMap<String, Option<IntCounter>>,
}

impl NetworkMetrics {
pub fn new() -> Self {
let mut peer_messages = HashMap::new();

let variants = PeerMessage::VARIANTS
.into_iter()
.filter(|&name| *name != "Routed")
.chain(RoutedMessageBody::VARIANTS.into_iter());

for name in variants {
let counter_name = NetworkMetrics::peer_message_total_rx(name.as_ref());
peer_messages.insert(
counter_name.clone(),
try_create_int_counter(counter_name.as_ref(), counter_name.as_ref()).ok(),
);

let counter_name = NetworkMetrics::peer_message_bytes_rx(name.as_ref());
peer_messages.insert(
counter_name.clone(),
try_create_int_counter(counter_name.as_ref(), counter_name.as_ref()).ok(),
);

let counter_name = NetworkMetrics::peer_message_dropped(name.as_ref());
peer_messages.insert(
counter_name.clone(),
try_create_int_counter(counter_name.as_ref(), counter_name.as_ref()).ok(),
);
}

Self { peer_messages }
}

pub fn peer_message_total_rx(message_name: &str) -> String {
format!("{}_total", message_name)
}

pub fn peer_message_bytes_rx(message_name: &str) -> String {
format!("{}_bytes", message_name)
}

pub fn peer_message_dropped(message_name: &str) -> String {
format!("{}_dropped", message_name)
}

pub fn inc(&self, message_name: &str) {
if let Some(counter) = self.peer_messages.get(message_name) {
inc_counter_opt(counter.as_ref());
}
}

pub fn inc_by(&self, message_name: &str, value: i64) {
if let Some(counter) = self.peer_messages.get(message_name) {
inc_counter_by_opt(counter.as_ref(), value);
}
}
}
13 changes: 12 additions & 1 deletion chain/network/src/peer.rs
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@ use crate::types::{
};
use crate::PeerManagerActor;
use crate::{metrics, NetworkResponses};
use metrics::NetworkMetrics;

type WriteHalf = tokio::io::WriteHalf<tokio::net::TcpStream>;

@@ -156,6 +157,8 @@ pub struct Peer {
edge_info: Option<EdgeInfo>,
/// Last time an update of received message was sent to PeerManager
last_time_received_message_update: Instant,
/// Dynamic Prometheus metrics
network_metrics: NetworkMetrics,
}

impl Peer {
@@ -170,6 +173,7 @@ impl Peer {
client_addr: Recipient<NetworkClientMessages>,
view_client_addr: Recipient<NetworkViewClientMessages>,
edge_info: Option<EdgeInfo>,
network_metrics: NetworkMetrics,
) -> Self {
Peer {
node_info,
@@ -187,6 +191,7 @@ impl Peer {
chain_info: Default::default(),
edge_info,
last_time_received_message_update: Instant::now(),
network_metrics,
}
}

@@ -609,7 +614,13 @@ impl StreamHandler<Vec<u8>> for Peer {
self.peer_manager_addr.do_send(metadata);
}

peer_msg.record(msg.len());
self.network_metrics
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we put this behind "metric_recorder" or some other flag?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok putting it behind a flag as stated below, but since I think it should be enabled by default, metric_recorder is not a good flag, since we don't want to enable metric_recorder by default as it consume more resources.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I am reading here confuses me. It sounds like we have no a very descriptive naming metric_recorder if we don't want to include all the recorded metrics under it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens in practice, is that metric_recorder store too much information, with little aggregation, it have been useful to track down some issue, but we don't really want to put all metrics there, since some of them should be exposed anyway. I can change the name to extra_metrics.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra_metrics and slow_metrics sound good to me

.inc(NetworkMetrics::peer_message_total_rx(&peer_msg.msg_variant()).as_ref());

self.network_metrics.inc_by(
NetworkMetrics::peer_message_bytes_rx(&peer_msg.msg_variant()).as_ref(),
msg.len() as i64,
);

match (self.peer_type, self.peer_status, peer_msg) {
(_, PeerStatus::Connecting, PeerMessage::HandshakeFailure(peer_info, reason)) => {
14 changes: 13 additions & 1 deletion chain/network/src/peer_manager.rs
Original file line number Diff line number Diff line change
@@ -44,6 +44,7 @@ use crate::types::{
KnownPeerState, NetworkClientMessages, NetworkConfig, NetworkRequests, NetworkResponses,
PeerInfo,
};
use metrics::NetworkMetrics;

/// How often to request peers from active peers.
const REQUEST_PEERS_SECS: u64 = 60;
@@ -105,6 +106,8 @@ pub struct PeerManagerActor {
monitor_peers_attempts: u64,
/// Active peers we have sent new edge update, but we haven't received response so far.
pending_update_nonce_request: HashMap<PeerId, u64>,
/// Dynamic Prometheus metrics
network_metrics: NetworkMetrics,
/// Store all collected metrics from a node.
#[cfg(feature = "metric_recorder")]
metric_recorder: MetricRecorder,
@@ -138,6 +141,7 @@ impl PeerManagerActor {
routing_table,
monitor_peers_attempts: 0,
pending_update_nonce_request: HashMap::new(),
network_metrics: NetworkMetrics::new(),
#[cfg(feature = "metric_recorder")]
metric_recorder,
})
@@ -350,6 +354,8 @@ impl PeerManagerActor {
}
};

let network_metrics = self.network_metrics.clone();

// Start every peer actor on separate thread.
let arbiter = Arbiter::new();
Peer::start_in_arbiter(&arbiter, move |ctx| {
@@ -368,6 +374,7 @@ impl PeerManagerActor {
.map(Result::unwrap),
ctx,
);

Peer::new(
PeerInfo { id: peer_id, addr: Some(server_addr), account_id },
remote_addr,
@@ -379,6 +386,7 @@ impl PeerManagerActor {
client_addr,
view_client_addr,
edge_info,
network_metrics,
)
});
}
@@ -820,7 +828,11 @@ impl PeerManagerActor {
}
Err(find_route_error) => {
// TODO(MarX, #1369): Message is dropped here. Define policy for this case.
near_metrics::inc_counter(&metrics::DROP_MESSAGE_UNREACHABLE_PEER);
self.network_metrics.inc(
NetworkMetrics::peer_message_dropped(strum::AsStaticRef::as_static(&msg.body))
.as_str(),
);

debug!(target: "network", "{:?} Drop signed message to {:?} Reason {:?}. Known peers: {:?} Message {:?}",
self.config.account_id,
msg.target,
11 changes: 3 additions & 8 deletions chain/network/src/routing.rs
Original file line number Diff line number Diff line change
@@ -664,11 +664,12 @@ impl RoutingTable {

/// Recalculate routing table.
pub fn update(&mut self) {
let _routing_table_recalculation =
near_metrics::start_timer(&metrics::ROUTING_TABLE_RECALCULATION_HISTOGRAM);

trace!(target: "network", "Update routing table.");
self.recalculation_scheduled = None;

let start = Instant::now();

self.peer_forwarding = self.raw_graph.calculate_distance();

let now = chrono::Utc::now();
@@ -678,13 +679,7 @@ impl RoutingTable {

self.try_save_edges();

let duration = Instant::now().duration_since(start).as_millis();

near_metrics::inc_counter_by(&metrics::ROUTING_TABLE_RECALCULATIONS, 1);
near_metrics::set_gauge(
&metrics::ROUTING_TABLE_RECALCULATION_MILLISECONDS,
duration as i64,
);
near_metrics::set_gauge(&metrics::PEER_REACHABLE, self.peer_forwarding.len() as i64);
}

264 changes: 27 additions & 237 deletions chain/network/src/types.rs
Original file line number Diff line number Diff line change
@@ -18,7 +18,6 @@ use tracing::{error, warn};
use near_chain::types::ShardStateSyncResponse;
use near_chain::{Block, BlockHeader};
use near_crypto::{PublicKey, SecretKey, Signature};
use near_metrics;
use near_primitives::block::{Approval, ApprovalMessage, GenesisId};
use near_primitives::challenge::Challenge;
use near_primitives::errors::InvalidTxError;
@@ -33,7 +32,6 @@ use near_primitives::utils::{from_timestamp, to_timestamp};
use near_primitives::version::FIRST_BACKWARD_COMPATIBLE_PROTOCOL_VERSION;
use near_primitives::views::{FinalExecutionOutcomeView, QueryRequest, QueryResponse};

use crate::metrics;
use crate::peer::Peer;
#[cfg(feature = "metric_recorder")]
use crate::recorder::MetricRecorder;
@@ -216,7 +214,17 @@ pub struct Pong {
}

// TODO(#1313): Use Box
#[derive(BorshSerialize, BorshDeserialize, Serialize, PartialEq, Eq, Clone, Debug)]
#[derive(
BorshSerialize,
BorshDeserialize,
Serialize,
PartialEq,
Eq,
Clone,
Debug,
strum::AsStaticStr,
strum::EnumVariantNames,
)]
#[allow(clippy::large_enum_variant)]
pub enum RoutedMessageBody {
BlockApproval(Approval),
@@ -395,7 +403,17 @@ impl SyncData {
/// Warning, position of each message type in this enum defines the protocol due to serialization.
/// DO NOT MOVE, REORDER, DELETE items from the list. Only add new items to the end.
/// If need to remove old items - replace with `None`.
#[derive(BorshSerialize, BorshDeserialize, Serialize, PartialEq, Eq, Clone, Debug)]
#[derive(
BorshSerialize,
BorshDeserialize,
Serialize,
PartialEq,
Eq,
Clone,
Debug,
strum::AsStaticStr,
strum::EnumVariantNames,
)]
// TODO(#1313): Use Box
#[allow(clippy::large_enum_variant)]
pub enum PeerMessage {
@@ -428,245 +446,17 @@ pub enum PeerMessage {

impl fmt::Display for PeerMessage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PeerMessage::Handshake(_) => f.write_str("Handshake"),
PeerMessage::HandshakeFailure(_, _) => f.write_str("HandshakeFailure"),
PeerMessage::Sync(_) => f.write_str("Sync"),
PeerMessage::RequestUpdateNonce(_) => f.write_str("RequestUpdateNonce"),
PeerMessage::ResponseUpdateNonce(_) => f.write_str("ResponseUpdateNonce"),
PeerMessage::LastEdge(_) => f.write_str("LastEdge"),
PeerMessage::PeersRequest => f.write_str("PeersRequest"),
PeerMessage::PeersResponse(_) => f.write_str("PeersResponse"),
PeerMessage::BlockHeadersRequest(_) => f.write_str("BlockHeaderRequest"),
PeerMessage::BlockHeaders(_) => f.write_str("BlockHeaders"),
PeerMessage::BlockRequest(_) => f.write_str("BlockRequest"),
PeerMessage::Block(_) => f.write_str("Block"),
PeerMessage::Transaction(_) => f.write_str("Transaction"),
PeerMessage::Routed(routed_message) => match routed_message.body {
RoutedMessageBody::BlockApproval(_) => f.write_str("BlockApproval"),
RoutedMessageBody::ForwardTx(_) => f.write_str("ForwardTx"),
RoutedMessageBody::TxStatusRequest(_, _) => f.write_str("Transaction status query"),
RoutedMessageBody::TxStatusResponse(_) => {
f.write_str("Transaction status response")
}
RoutedMessageBody::QueryRequest { .. } => f.write_str("Query request"),
RoutedMessageBody::QueryResponse { .. } => f.write_str("Query response"),
RoutedMessageBody::StateRequestHeader(_, _) => f.write_str("StateRequestHeader"),
RoutedMessageBody::StateRequestPart(_, _, _) => f.write_str("StateRequestPart"),
RoutedMessageBody::StateResponse(_) => f.write_str("StateResponse"),
RoutedMessageBody::ReceiptOutcomeRequest(_) => {
f.write_str("Receipt outcome request")
}
RoutedMessageBody::ReceiptOutComeResponse(_) => {
f.write_str("Receipt outcome response")
}
RoutedMessageBody::PartialEncodedChunkRequest(_) => {
f.write_str("PartialEncodedChunkRequest")
}
RoutedMessageBody::PartialEncodedChunkResponse(_) => {
f.write_str("PartialEncodedChunkResponse")
}
RoutedMessageBody::PartialEncodedChunk(_) => f.write_str("PartialEncodedChunk"),
RoutedMessageBody::Ping(_) => f.write_str("Ping"),
RoutedMessageBody::Pong(_) => f.write_str("Pong"),
},
PeerMessage::Disconnect => f.write_str("Disconnect"),
PeerMessage::Challenge(_) => f.write_str("Challenge"),
}
f.write_str(format!("{}", self.msg_variant()).as_ref())
}
}

impl PeerMessage {
pub fn record(&self, size: usize) {
pub fn msg_variant(&self) -> &str {
match self {
PeerMessage::Handshake(_) => {
near_metrics::inc_counter(&metrics::HANDSHAKE_RECEIVED_TOTAL);
near_metrics::inc_counter_by(&metrics::HANDSHAKE_RECEIVED_BYTES, size as i64);
}
PeerMessage::HandshakeFailure(_, _) => {
near_metrics::inc_counter(&metrics::HANDSHAKE_FAILURE_RECEIVED_TOTAL);
near_metrics::inc_counter_by(
&metrics::HANDSHAKE_FAILURE_RECEIVED_BYTES,
size as i64,
);
}
PeerMessage::Sync(_) => {
near_metrics::inc_counter(&metrics::SYNC_RECEIVED_TOTAL);
near_metrics::inc_counter_by(&metrics::SYNC_RECEIVED_BYTES, size as i64);
}
PeerMessage::RequestUpdateNonce(_) => {
near_metrics::inc_counter(&metrics::REQUEST_UPDATE_NONCE_RECEIVED_TOTAL);
near_metrics::inc_counter_by(
&metrics::REQUEST_UPDATE_NONCE_RECEIVED_BYTES,
size as i64,
);
}
PeerMessage::ResponseUpdateNonce(_) => {
near_metrics::inc_counter(&metrics::RESPONSE_UPDATE_NONCE_RECEIVED_TOTAL);
near_metrics::inc_counter_by(
&metrics::RESPONSE_UPDATE_NONCE_RECEIVED_BYTES,
size as i64,
);
}
PeerMessage::LastEdge(_) => {
near_metrics::inc_counter(&metrics::LAST_EDGE_RECEIVED_TOTAL);
near_metrics::inc_counter_by(&metrics::LAST_EDGE_RECEIVED_BYTES, size as i64);
}
PeerMessage::PeersRequest => {
near_metrics::inc_counter(&metrics::PEERS_REQUEST_RECEIVED_TOTAL);
near_metrics::inc_counter_by(&metrics::PEERS_REQUEST_RECEIVED_BYTES, size as i64);
}
PeerMessage::PeersResponse(_) => {
near_metrics::inc_counter(&metrics::PEERS_RESPONSE_RECEIVED_TOTAL);
near_metrics::inc_counter_by(&metrics::PEERS_RESPONSE_RECEIVED_BYTES, size as i64);
}
PeerMessage::BlockHeadersRequest(_) => {
near_metrics::inc_counter(&metrics::BLOCK_HEADERS_REQUEST_RECEIVED_TOTAL);
near_metrics::inc_counter_by(
&metrics::BLOCK_HEADERS_REQUEST_RECEIVED_BYTES,
size as i64,
);
}
PeerMessage::BlockHeaders(_) => {
near_metrics::inc_counter(&metrics::BLOCK_HEADERS_RECEIVED_TOTAL);
near_metrics::inc_counter_by(&metrics::BLOCK_HEADERS_RECEIVED_BYTES, size as i64);
}
PeerMessage::BlockRequest(_) => {
near_metrics::inc_counter(&metrics::BLOCK_REQUEST_RECEIVED_TOTAL);
near_metrics::inc_counter_by(&metrics::BLOCK_REQUEST_RECEIVED_BYTES, size as i64);
}
PeerMessage::Block(_) => {
near_metrics::inc_counter(&metrics::BLOCK_RECEIVED_TOTAL);
near_metrics::inc_counter_by(&metrics::BLOCK_RECEIVED_BYTES, size as i64);
}
PeerMessage::Transaction(_) => {
near_metrics::inc_counter(&metrics::TRANSACTION_RECEIVED_TOTAL);
near_metrics::inc_counter_by(&metrics::TRANSACTION_RECEIVED_BYTES, size as i64);
}
PeerMessage::Routed(routed_message) => match routed_message.body {
RoutedMessageBody::BlockApproval(_) => {
near_metrics::inc_counter(&metrics::ROUTED_BLOCK_APPROVAL_RECEIVED_TOTAL);
near_metrics::inc_counter_by(
&metrics::ROUTED_BLOCK_APPROVAL_RECEIVED_BYTES,
size as i64,
);
}
RoutedMessageBody::ForwardTx(_) => {
near_metrics::inc_counter(&metrics::ROUTED_FORWARD_TX_RECEIVED_TOTAL);
near_metrics::inc_counter_by(
&metrics::ROUTED_FORWARD_TX_RECEIVED_BYTES,
size as i64,
);
}
RoutedMessageBody::TxStatusRequest(_, _) => {
near_metrics::inc_counter(&metrics::ROUTED_TX_STATUS_REQUEST_RECEIVED_TOTAL);
near_metrics::inc_counter_by(
&metrics::ROUTED_TX_STATUS_REQUEST_RECEIVED_BYTES,
size as i64,
);
}
RoutedMessageBody::TxStatusResponse(_) => {
near_metrics::inc_counter(&metrics::ROUTED_TX_STATUS_RESPONSE_RECEIVED_TOTAL);
near_metrics::inc_counter_by(
&metrics::ROUTED_TX_STATUS_RESPONSE_RECEIVED_BYTES,
size as i64,
);
}
RoutedMessageBody::QueryRequest { .. } => {
near_metrics::inc_counter(&metrics::ROUTED_QUERY_REQUEST_RECEIVED_TOTAL);
near_metrics::inc_counter_by(
&metrics::ROUTED_QUERY_REQUEST_RECEIVED_BYTES,
size as i64,
);
}
RoutedMessageBody::QueryResponse { .. } => {
near_metrics::inc_counter(&metrics::ROUTED_QUERY_RESPONSE_RECEIVED_TOTAL);
near_metrics::inc_counter_by(
&metrics::ROUTED_QUERY_RESPONSE_RECEIVED_BYTES,
size as i64,
);
}
RoutedMessageBody::ReceiptOutcomeRequest(_) => {
near_metrics::inc_counter(
&metrics::ROUTED_RECEIPT_OUTCOME_REQUEST_RECEIVED_TOTAL,
);
near_metrics::inc_counter_by(
&metrics::ROUTED_RECEIPT_OUTCOME_REQUEST_RECEIVED_BYTES,
size as i64,
);
}
RoutedMessageBody::ReceiptOutComeResponse(_) => {
near_metrics::inc_counter(
&metrics::ROUTED_RECEIPT_OUTCOME_RESPONSE_RECEIVED_TOTAL,
);
near_metrics::inc_counter_by(
&metrics::ROUTED_RECEIPT_OUTCOME_RESPONSE_RECEIVED_BYTES,
size as i64,
);
}
RoutedMessageBody::StateRequestHeader(_, _) => {
near_metrics::inc_counter(&metrics::ROUTED_STATE_REQUEST_HEADER_RECEIVED_TOTAL);
near_metrics::inc_counter_by(
&metrics::ROUTED_STATE_REQUEST_HEADER_RECEIVED_BYTES,
size as i64,
);
}
RoutedMessageBody::StateRequestPart(_, _, _) => {
near_metrics::inc_counter(&metrics::ROUTED_STATE_REQUEST_PART_RECEIVED_TOTAL);
near_metrics::inc_counter_by(
&metrics::ROUTED_STATE_REQUEST_PART_RECEIVED_BYTES,
size as i64,
);
}
RoutedMessageBody::StateResponse(_) => {
near_metrics::inc_counter(&metrics::ROUTED_STATE_RESPONSE_RECEIVED_TOTAL);
near_metrics::inc_counter_by(
&metrics::ROUTED_STATE_RESPONSE_RECEIVED_BYTES,
size as i64,
);
}
RoutedMessageBody::PartialEncodedChunkRequest(_) => {
near_metrics::inc_counter(
&metrics::ROUTED_PARTIAL_CHUNK_REQUEST_RECEIVED_TOTAL,
);
near_metrics::inc_counter_by(
&metrics::ROUTED_PARTIAL_CHUNK_REQUEST_RECEIVED_BYTES,
size as i64,
);
}
RoutedMessageBody::PartialEncodedChunkResponse(_) => {
near_metrics::inc_counter(
&metrics::ROUTED_PARTIAL_CHUNK_RESPONSE_RECEIVED_TOTAL,
);
near_metrics::inc_counter_by(
&metrics::ROUTED_PARTIAL_CHUNK_RESPONSE_RECEIVED_BYTES,
size as i64,
);
}
RoutedMessageBody::PartialEncodedChunk(_) => {
near_metrics::inc_counter(&metrics::ROUTED_PARTIAL_CHUNK_RECEIVED_TOTAL);
near_metrics::inc_counter_by(
&metrics::ROUTED_PARTIAL_CHUNK_RECEIVED_BYTES,
size as i64,
);
}
RoutedMessageBody::Ping(_) => {
near_metrics::inc_counter(&metrics::ROUTED_PING_RECEIVED_TOTAL);
near_metrics::inc_counter_by(&metrics::ROUTED_PING_RECEIVED_BYTES, size as i64);
}
RoutedMessageBody::Pong(_) => {
near_metrics::inc_counter(&metrics::ROUTED_PONG_RECEIVED_TOTAL);
near_metrics::inc_counter_by(&metrics::ROUTED_PONG_RECEIVED_BYTES, size as i64);
}
},
PeerMessage::Disconnect => {
near_metrics::inc_counter(&metrics::DISCONNECT_RECEIVED_TOTAL);
near_metrics::inc_counter_by(&metrics::DISCONNECT_RECEIVED_BYTES, size as i64);
}
PeerMessage::Challenge(_) => {
near_metrics::inc_counter(&metrics::CHALLENGE_RECEIVED_TOTAL);
near_metrics::inc_counter_by(&metrics::CHALLENGE_RECEIVED_BYTES, size as i64);
PeerMessage::Routed(routed_message) => {
strum::AsStaticRef::as_static(&routed_message.body)
}
_ => strum::AsStaticRef::as_static(self),
}
}

12 changes: 12 additions & 0 deletions core/metrics/src/lib.rs
Original file line number Diff line number Diff line change
@@ -126,6 +126,12 @@ pub fn inc_counter(counter: &Result<IntCounter>) {
}
}

pub fn inc_counter_opt(counter: Option<&IntCounter>) {
if let Some(counter) = counter {
counter.inc();
}
}

pub fn get_counter(counter: &Result<IntCounter>) -> std::result::Result<i64, String> {
if let Ok(counter) = counter {
Ok(counter.get())
@@ -142,6 +148,12 @@ pub fn inc_counter_by(counter: &Result<IntCounter>, value: i64) {
}
}

pub fn inc_counter_by_opt(counter: Option<&IntCounter>, value: i64) {
if let Some(counter) = counter {
counter.inc_by(value);
}
}

pub fn set_gauge(gauge: &Result<IntGauge>, value: i64) {
if let Ok(gauge) = gauge {
gauge.set(value);
2 changes: 2 additions & 0 deletions core/primitives/src/views.rs
Original file line number Diff line number Diff line change
@@ -266,6 +266,8 @@ pub struct StatusResponse {
pub validators: Vec<ValidatorInfo>,
/// Sync status of the node.
pub sync_info: StatusSyncInfo,
/// Validator id of the node
pub validator_account_id: Option<AccountId>,
}

impl TryFrom<QueryResponse> for AccountView {