Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve backlog metrics #3722

Merged
merged 13 commits into from
Dec 14, 2023
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- Update the values of `backlog` metrics when clearing packets.
Change the `backlog_oldest_timestamp` to `backlog_latest_update_timestamp`
which shows the last time the `backlog` metrics have been updated.
([\#3723](https://github.com/informalsystems/hermes/issues/3723))
14 changes: 14 additions & 0 deletions crates/relayer/src/chain/counterparty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::channel::ChannelError;
use crate::client_state::IdentifiedAnyClientState;
use crate::path::PathIdentifiers;
use crate::supervisor::Error;
use crate::telemetry;

pub fn counterparty_chain_from_connection(
src_chain: &impl ChainHandle,
Expand Down Expand Up @@ -502,6 +503,19 @@ pub fn unreceived_packets(
&path.counterparty_channel_id,
)?;

telemetry!(
update_backlog,
commit_sequences
.iter()
.map(|s| std::convert::Into::<u64>::into(*s))
ljoss17 marked this conversation as resolved.
Show resolved Hide resolved
.collect::<Vec<u64>>()
.clone(),
&counterparty_chain.id(),
&path.counterparty_channel_id,
&path.counterparty_port_id,
&chain.id()
);

let packet_seq_nrs =
unreceived_packets_sequences(chain, &path.port_id, &path.channel_id, commit_sequences)?;

Expand Down
262 changes: 231 additions & 31 deletions crates/telemetry/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@ pub struct TelemetryState {
/// SendPacket events were relayed.
backlog_oldest_sequence: ObservableGauge<u64>,

/// Record the timestamp related to `backlog_oldest_sequence`.
/// Record the timestamp of the last time the `backlog_*` metrics have been updated.
/// The timestamp is the time passed since since the unix epoch in seconds.
backlog_oldest_timestamp: ObservableGauge<u64>,
backlog_latest_update_timestamp: ObservableGauge<u64>,

/// Records the length of the backlog, i.e., how many packets are pending.
backlog_size: ObservableGauge<u64>,
Expand Down Expand Up @@ -350,10 +350,10 @@ impl TelemetryState {
.with_description("Sequence number of the oldest SendPacket event in the backlog")
.init(),

backlog_oldest_timestamp: meter
.u64_observable_gauge("backlog_oldest_timestamp")
backlog_latest_update_timestamp: meter
.u64_observable_gauge("backlog_latest_update_timestamp")
.with_unit(Unit::new("seconds"))
.with_description("Local timestamp for the oldest SendPacket event in the backlog")
.with_description("Local timestamp for the last time the backlog metrics have been updated")
.init(),

backlog_size: meter
Expand Down Expand Up @@ -457,7 +457,7 @@ impl TelemetryState {
}

self.backlog_oldest_sequence.observe(&cx, 0, labels);
self.backlog_oldest_timestamp.observe(&cx, 0, labels);
self.backlog_latest_update_timestamp.observe(&cx, 0, labels);
self.backlog_size.observe(&cx, 0, labels);
}

Expand Down Expand Up @@ -922,8 +922,7 @@ impl TelemetryState {
};

// Update the backlog with the incoming data and retrieve the oldest values
let (oldest_sn, oldest_ts, total) = if let Some(path_backlog) = self.backlogs.get(&path_uid)
{
let (oldest_sn, total) = if let Some(path_backlog) = self.backlogs.get(&path_uid) {
// Avoid having the inner backlog map growing more than a given threshold, by removing
// the oldest sequence number entry.
if path_backlog.len() > BACKLOG_RESET_THRESHOLD {
Expand All @@ -935,20 +934,11 @@ impl TelemetryState {

// Return the oldest event information to be recorded in telemetry
if let Some(min) = path_backlog.iter().map(|v| *v.key()).min() {
if let Some(oldest) = path_backlog.get(&min) {
(min, *oldest.value(), path_backlog.len() as u64)
} else {
// Timestamp was not found, this should not happen, record a 0 ts.
(min, 0, path_backlog.len() as u64)
}
(min, path_backlog.len() as u64)
} else {
// We just inserted a new key/value, so this else branch is unlikely to activate,
// but it can happen in case of concurrent updates to the backlog.
(
EMPTY_BACKLOG_SYMBOL,
EMPTY_BACKLOG_SYMBOL,
EMPTY_BACKLOG_SYMBOL,
)
(EMPTY_BACKLOG_SYMBOL, EMPTY_BACKLOG_SYMBOL)
}
} else {
// If there is no inner backlog for this path, create a new map to store it.
Expand All @@ -958,16 +948,57 @@ impl TelemetryState {
self.backlogs.insert(path_uid, new_path_backlog);

// Return the current event information to be recorded in telemetry
(seq_nr, timestamp, 1)
(seq_nr, 1)
};

// Update metrics to reflect the new state of the backlog
self.backlog_oldest_sequence.observe(&cx, oldest_sn, labels);
self.backlog_oldest_timestamp
.observe(&cx, oldest_ts, labels);
self.backlog_latest_update_timestamp
.observe(&cx, timestamp, labels);
self.backlog_size.observe(&cx, total, labels);
}

/// Inserts in the backlog a new event for the given sequence number.
/// This happens when the relayer observed a new SendPacket event.
pub fn update_backlog(
&self,
sequences: Vec<u64>,
chain_id: &ChainId,
channel_id: &ChannelId,
port_id: &PortId,
counterparty_chain_id: &ChainId,
) {
// Unique identifier for a chain/channel/port.
let path_uid: PathIdentifier = PathIdentifier::new(
chain_id.to_string(),
channel_id.to_string(),
port_id.to_string(),
);

// This condition is done in order to avoid having an incorrect `backlog_latest_update_timestamp`.
// If the sequences is an empty vector by removing the entries using `backlog_remove` the `backlog_latest_update_timestamp`
// will only be updated if the current backlog is not empty.
// If the sequences is not empty, then it is possible to simple remove the backlog for that path and insert the sequences.
if sequences.is_empty() {
if let Some(path_backlog) = self.backlogs.get(&path_uid) {
let current_keys: Vec<u64> = path_backlog
.value()
.iter()
.map(|entry| *entry.key())
.collect();

for key in current_keys.iter() {
self.backlog_remove(*key, chain_id, channel_id, port_id, counterparty_chain_id)
}
}
} else {
self.backlogs.remove(&path_uid);
for key in sequences.iter() {
self.backlog_insert(*key, chain_id, channel_id, port_id, counterparty_chain_id)
}
}
}

/// Evicts from the backlog the event for the given sequence number.
/// Removing events happens when the relayer observed either an acknowledgment
/// or a timeout for a packet sequence number, which means that the corresponding
Expand Down Expand Up @@ -996,25 +1027,27 @@ impl TelemetryState {
KeyValue::new("port", port_id.to_string()),
];

// Retrieve local timestamp when this SendPacket event was recorded.
let now = Time::now();
let timestamp = match now.duration_since(Time::unix_epoch()) {
Ok(ts) => ts.as_secs(),
Err(_) => 0,
};

if let Some(path_backlog) = self.backlogs.get(&path_uid) {
if path_backlog.remove(&seq_nr).is_some() {
// If the entry was removed update the latest update timestamp.
self.backlog_latest_update_timestamp
.observe(&cx, timestamp, labels);
// The oldest pending sequence number is the minimum key in the inner (path) backlog.
if let Some(min_key) = path_backlog.iter().map(|v| *v.key()).min() {
if let Some(oldest) = path_backlog.get(&min_key) {
self.backlog_oldest_timestamp
.observe(&cx, *oldest.value(), labels);
} else {
self.backlog_oldest_timestamp.observe(&cx, 0, labels);
}
self.backlog_oldest_sequence.observe(&cx, min_key, labels);
self.backlog_size
.observe(&cx, path_backlog.len() as u64, labels);
} else {
// No mimimum found, update the metrics to reflect an empty backlog
self.backlog_oldest_sequence
.observe(&cx, EMPTY_BACKLOG_SYMBOL, labels);
self.backlog_oldest_timestamp
.observe(&cx, EMPTY_BACKLOG_SYMBOL, labels);
self.backlog_size.observe(&cx, EMPTY_BACKLOG_SYMBOL, labels);
}
}
Expand Down Expand Up @@ -1156,7 +1189,7 @@ impl AggregatorSelector for CustomAggregatorSelector {
match descriptor.name() {
"wallet_balance" => Some(Arc::new(last_value())),
"backlog_oldest_sequence" => Some(Arc::new(last_value())),
"backlog_oldest_timestamp" => Some(Arc::new(last_value())),
"backlog_latest_update_timestamp" => Some(Arc::new(last_value())),
"backlog_size" => Some(Arc::new(last_value())),
// Prometheus' supports only collector for histogram, sum, and last value aggregators.
// https://docs.rs/opentelemetry-prometheus/0.10.0/src/opentelemetry_prometheus/lib.rs.html#411-418
Expand All @@ -1168,3 +1201,170 @@ impl AggregatorSelector for CustomAggregatorSelector {
}
}
}

#[cfg(all(feature = "telemetry", test))]
#[cfg(test)]
mod tests {
romac marked this conversation as resolved.
Show resolved Hide resolved
use prometheus::proto::Metric;

use super::*;

#[test]
fn insert_remove_backlog() {
let state = TelemetryState::new(
Range {
start: 0,
end: 5000,
},
5,
Range {
start: 0,
end: 5000,
},
5,
);

let chain_id = ChainId::from_string("chain-test");
let counterparty_chain_id = ChainId::from_string("counterpartychain-test");
let channel_id = ChannelId::new(0);
let port_id = PortId::transfer();

state.backlog_insert(1, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
state.backlog_insert(2, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
state.backlog_insert(3, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
state.backlog_insert(4, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
state.backlog_insert(5, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
state.backlog_remove(3, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
state.backlog_remove(1, &chain_id, &channel_id, &port_id, &counterparty_chain_id);

let metrics = state.exporter.registry().gather().clone();
let backlog_size = metrics
.iter()
.find(|metric| metric.get_name() == "backlog_size")
.unwrap();
assert!(
assert_metric_value(backlog_size.get_metric(), 3),
"expected backlog_size to be 3"
);
let backlog_oldest_sequence = metrics
.iter()
.find(|&metric| metric.get_name() == "backlog_oldest_sequence")
.unwrap();
assert!(
assert_metric_value(backlog_oldest_sequence.get_metric(), 2),
"expected backlog_oldest_sequence to be 2"
);
}

#[test]
fn update_backlog() {
let state = TelemetryState::new(
Range {
start: 0,
end: 5000,
},
5,
Range {
start: 0,
end: 5000,
},
5,
);

let chain_id = ChainId::from_string("chain-test");
let counterparty_chain_id = ChainId::from_string("counterpartychain-test");
let channel_id = ChannelId::new(0);
let port_id = PortId::transfer();

state.backlog_insert(1, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
state.backlog_insert(2, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
state.backlog_insert(3, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
state.backlog_insert(4, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
state.backlog_insert(5, &chain_id, &channel_id, &port_id, &counterparty_chain_id);

state.update_backlog(
vec![5],
&chain_id,
&channel_id,
&port_id,
&counterparty_chain_id,
);

let metrics = state.exporter.registry().gather().clone();
let backlog_size = metrics
.iter()
.find(|&metric| metric.get_name() == "backlog_size")
.unwrap();
assert!(
assert_metric_value(backlog_size.get_metric(), 1),
"expected backlog_size to be 1"
);
let backlog_oldest_sequence = metrics
.iter()
.find(|&metric| metric.get_name() == "backlog_oldest_sequence")
.unwrap();
assert!(
assert_metric_value(backlog_oldest_sequence.get_metric(), 5),
"expected backlog_oldest_sequence to be 5"
);
}

#[test]
fn update_backlog_empty() {
let state = TelemetryState::new(
Range {
start: 0,
end: 5000,
},
5,
Range {
start: 0,
end: 5000,
},
5,
);

let chain_id = ChainId::from_string("chain-test");
let counterparty_chain_id = ChainId::from_string("counterpartychain-test");
let channel_id = ChannelId::new(0);
let port_id = PortId::transfer();

state.backlog_insert(1, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
state.backlog_insert(2, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
state.backlog_insert(3, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
state.backlog_insert(4, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
state.backlog_insert(5, &chain_id, &channel_id, &port_id, &counterparty_chain_id);

state.update_backlog(
vec![],
&chain_id,
&channel_id,
&port_id,
&counterparty_chain_id,
);

let metrics = state.exporter.registry().gather().clone();
let backlog_size = metrics
.iter()
.find(|&metric| metric.get_name() == "backlog_size")
.unwrap();
assert!(
assert_metric_value(backlog_size.get_metric(), 0),
"expected backlog_size to be 0"
);
let backlog_oldest_sequence = metrics
.iter()
.find(|&metric| metric.get_name() == "backlog_oldest_sequence")
.unwrap();
assert!(
assert_metric_value(backlog_oldest_sequence.get_metric(), 0),
"expected backlog_oldest_sequence to be 0"
);
}

fn assert_metric_value(metric: &[Metric], expected: u64) -> bool {
metric
.iter()
.any(|m| m.get_gauge().get_value() as u64 == expected)
}
}
2 changes: 1 addition & 1 deletion guide/src/assets/grafana_template.json
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@
},
"editorMode": "builder",
"exemplar": false,
"expr": "backlog_oldest_timestamp{job=\"hermes\"}",
"expr": "backlog_latest_update_timestamp{job=\"hermes\"}",
"format": "table",
"hide": false,
"instant": true,
Expand Down
Loading
Loading