Skip to content

Commit

Permalink
Update metrics crate (#1462)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Feb 9, 2024
1 parent b94c868 commit c992dc4
Show file tree
Hide file tree
Showing 17 changed files with 168 additions and 156 deletions.
220 changes: 116 additions & 104 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,5 @@ typetag = "0.2.5"
aws-throwaway = { version = "0.6.0", default-features = false }
tokio-bin-process = "0.4.0"
ordered-float = { version = "4.0.0", features = ["serde"] }
hyper = { version = "0.14.14", features = ["server"] }
hyper = { version = "0.14.14", features = ["server", "tcp", "http1"] }
shell-quote = { default-features = false, version = "0.5.0" }
4 changes: 2 additions & 2 deletions shotover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ httparse = { version = "1.8.0", optional = true }
http = { version = "1.0.0", optional = true }

#Observability
metrics = "0.21.0"
metrics-exporter-prometheus = "0.12.0"
metrics = "0.22.0"
metrics-exporter-prometheus = { version = "0.13.0", default-features = false }
tracing.workspace = true
tracing-subscriber.workspace = true
tracing-appender.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion shotover/benches/benches/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ fn init() {
std::env::set_var("RUST_LIB_BACKTRACE", "0");

let recorder = PrometheusBuilder::new().build_recorder();
metrics::set_boxed_recorder(Box::new(recorder)).ok();
metrics::set_global_recorder(recorder).ok();
}

criterion_main!(
Expand Down
8 changes: 4 additions & 4 deletions shotover/src/codec/cassandra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use cassandra_protocol::frame::{Flags, Opcode, Version, PAYLOAD_SIZE_LIMIT};
use cql3_parser::cassandra_statement::CassandraStatement;
use cql3_parser::common::Identifier;
use lz4_flex::{block::get_maximum_output_size, compress_into, decompress};
use metrics::{register_counter, Counter, Histogram};
use metrics::{counter, Counter, Histogram};
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -154,9 +154,9 @@ pub struct VersionCounter {
impl VersionCounter {
fn new() -> Self {
Self {
v3: register_counter!("shotover_client_protocol_version_count", "version" => "v3"),
v4: register_counter!("shotover_client_protocol_version_count", "version" => "v4"),
v5: register_counter!("shotover_client_protocol_version_count", "version" => "v5"),
v3: counter!("shotover_client_protocol_version_count", "version" => "v3"),
v4: counter!("shotover_client_protocol_version_count", "version" => "v4"),
v5: counter!("shotover_client_protocol_version_count", "version" => "v5"),
}
}

Expand Down
6 changes: 3 additions & 3 deletions shotover/src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use cassandra_protocol::compression::Compression;
use core::fmt;
#[cfg(feature = "kafka")]
use kafka::RequestHeader;
use metrics::{register_histogram, Histogram};
use metrics::{histogram, Histogram};
use tokio_util::codec::{Decoder, Encoder};

#[cfg(feature = "cassandra")]
Expand Down Expand Up @@ -36,10 +36,10 @@ impl fmt::Display for Direction {
pub fn message_latency(direction: Direction, destination_name: String) -> Histogram {
match direction {
Direction::Source => {
register_histogram!("shotover_sink_to_source_latency_seconds", "source" => destination_name)
histogram!("shotover_sink_to_source_latency_seconds", "source" => destination_name)
}
Direction::Sink => {
register_histogram!("shotover_source_to_sink_latency_seconds", "sink" => destination_name)
histogram!("shotover_source_to_sink_latency_seconds", "sink" => destination_name)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion shotover/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl Shotover {
.unwrap()
.build_recorder();
let handle = recorder.handle();
metrics::set_boxed_recorder(Box::new(recorder))?;
metrics::set_global_recorder(recorder)?;

let socket: SocketAddr = config.observability_interface.parse()?;
let exporter = LogFilterHttpExporter::new(handle, socket, tracing.handle.clone());
Expand Down
5 changes: 3 additions & 2 deletions shotover/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use anyhow::{anyhow, Context, Result};
use bytes::BytesMut;
use futures::future::join_all;
use futures::{SinkExt, StreamExt};
use metrics::{register_gauge, Gauge};
use metrics::{gauge, Gauge};
use std::io::ErrorKind;
use std::net::SocketAddr;
use std::sync::Arc;
Expand Down Expand Up @@ -88,7 +88,8 @@ impl<C: CodecBuilder + 'static> TcpCodecListener<C> {
timeout: Option<Duration>,
transport: Transport,
) -> Result<Self, Vec<String>> {
let available_connections_gauge = register_gauge!("shotover_available_connections_count", "source" => source_name.clone());
let available_connections_gauge =
gauge!("shotover_available_connections_count", "source" => source_name.clone());
available_connections_gauge.set(limit_connections.available_permits() as f64);

let chain_builder = chain_config
Expand Down
4 changes: 2 additions & 2 deletions shotover/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use cql3_parser::cassandra_statement::CassandraStatement;
use cql3_parser::common::IdentifierRef;
use futures::stream::FuturesOrdered;
use futures::StreamExt;
use metrics::{register_counter, Counter};
use metrics::{counter, Counter};
use node::{CassandraNode, ConnectionFactory};
use node_pool::{GetReplicaErr, KeyspaceMetadata, NodePool};
use rand::prelude::*;
Expand Down Expand Up @@ -116,7 +116,7 @@ impl CassandraSinkClusterBuilder {
connect_timeout_ms: u64,
timeout: Option<u64>,
) -> Self {
let failed_requests = register_counter!("shotover_failed_requests_count", "chain" => chain_name.clone(), "transform" => "CassandraSinkCluster");
let failed_requests = counter!("shotover_failed_requests_count", "chain" => chain_name.clone(), "transform" => "CassandraSinkCluster");
let receive_timeout = timeout.map(Duration::from_secs);
let connect_timeout = Duration::from_millis(connect_timeout_ms);

Expand Down
4 changes: 2 additions & 2 deletions shotover/src/transforms/cassandra/sink_cluster/node_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::transforms::cassandra::connection::CassandraConnection;
use anyhow::{anyhow, Context, Error, Result};
use cassandra_protocol::frame::message_execute::BodyReqExecuteOwned;
use cassandra_protocol::types::CBytesShort;
use metrics::{register_counter, Counter};
use metrics::{counter, Counter};
use rand::prelude::*;
use std::sync::Arc;
use std::{collections::HashMap, net::SocketAddr};
Expand Down Expand Up @@ -52,7 +52,7 @@ impl NodePoolBuilder {
pub fn new(chain_name: String) -> Self {
Self {
prepared_metadata: Arc::new(RwLock::new(HashMap::new())),
out_of_rack_requests: register_counter!("shotover_out_of_rack_requests_count", "chain" => chain_name, "transform" => "CassandraSinkCluster"),
out_of_rack_requests: counter!("shotover_out_of_rack_requests_count", "chain" => chain_name, "transform" => "CassandraSinkCluster"),
}
}

Expand Down
4 changes: 2 additions & 2 deletions shotover/src/transforms/cassandra/sink_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use anyhow::{anyhow, Result};
use async_trait::async_trait;
use cassandra_protocol::frame::Version;
use futures::stream::FuturesOrdered;
use metrics::{register_counter, Counter};
use metrics::{counter, Counter};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};
Expand Down Expand Up @@ -60,7 +60,7 @@ impl CassandraSinkSingleBuilder {
connect_timeout_ms: u64,
timeout: Option<u64>,
) -> CassandraSinkSingleBuilder {
let failed_requests = register_counter!("shotover_failed_requests_count", "chain" => chain_name, "transform" => "CassandraSinkSingle");
let failed_requests = counter!("shotover_failed_requests_count", "chain" => chain_name, "transform" => "CassandraSinkSingle");
let receive_timeout = timeout.map(Duration::from_secs);
let codec_builder =
CassandraCodecBuilder::new(Direction::Sink, "CassandraSinkSingle".to_owned());
Expand Down
24 changes: 12 additions & 12 deletions shotover/src/transforms/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::transforms::{Transform, TransformBuilder, Wrapper};
use anyhow::{anyhow, Result};
use derivative::Derivative;
use futures::TryFutureExt;
use metrics::{histogram, register_counter, register_histogram, Counter, Histogram};
use metrics::{counter, histogram, Counter, Histogram};
use std::net::SocketAddr;
use tokio::sync::{mpsc, oneshot};
use tokio::time::{Duration, Instant};
Expand Down Expand Up @@ -168,7 +168,7 @@ impl TransformChain {
self.chain_failures.increment(1);
}

histogram!("shotover_chain_latency_seconds", start.elapsed(), "chain" => self.name, "client_details" => client_details);
histogram!("shotover_chain_latency_seconds", "chain" => self.name, "client_details" => client_details).record(start.elapsed());
result
}

Expand All @@ -184,7 +184,7 @@ impl TransformChain {
self.chain_failures.increment(1);
}

histogram!("shotover_chain_latency_seconds", start.elapsed(), "chain" => self.name, "client_details" => client_details);
histogram!("shotover_chain_latency_seconds", "chain" => self.name, "client_details" => client_details).record(start.elapsed());
result
}
}
Expand Down Expand Up @@ -264,20 +264,20 @@ impl TransformChainBuilder {
pub fn new(chain: Vec<Box<dyn TransformBuilder>>, name: &'static str) -> Self {
let chain = chain.into_iter().map(|builder|
TransformBuilderAndMetrics {
transform_total: register_counter!("shotover_transform_total_count", "transform" => builder.get_name()),
transform_failures: register_counter!("shotover_transform_failures_count", "transform" => builder.get_name()),
transform_latency: register_histogram!("shotover_transform_latency_seconds", "transform" => builder.get_name()),
transform_pushed_total: register_counter!("shotover_transform_pushed_total_count", "transform" => builder.get_name()),
transform_pushed_failures: register_counter!("shotover_transform_pushed_failures_count", "transform" => builder.get_name()),
transform_pushed_latency: register_histogram!("shotover_transform_pushed_latency_seconds", "transform" => builder.get_name()),
transform_total: counter!("shotover_transform_total_count", "transform" => builder.get_name()),
transform_failures: counter!("shotover_transform_failures_count", "transform" => builder.get_name()),
transform_latency: histogram!("shotover_transform_latency_seconds", "transform" => builder.get_name()),
transform_pushed_total: counter!("shotover_transform_pushed_total_count", "transform" => builder.get_name()),
transform_pushed_failures: counter!("shotover_transform_pushed_failures_count", "transform" => builder.get_name()),
transform_pushed_latency: histogram!("shotover_transform_pushed_latency_seconds", "transform" => builder.get_name()),
builder,
}
).collect();

let chain_batch_size =
register_histogram!("shotover_chain_messages_per_batch_count", "chain" => name);
let chain_total = register_counter!("shotover_chain_total_count", "chain" => name);
let chain_failures = register_counter!("shotover_chain_failures_count", "chain" => name);
histogram!("shotover_chain_messages_per_batch_count", "chain" => name);
let chain_total = counter!("shotover_chain_total_count", "chain" => name);
let chain_failures = counter!("shotover_chain_failures_count", "chain" => name);
// Cant register shotover_chain_latency_seconds because a unique one is created for each client ip address

TransformChainBuilder {
Expand Down
14 changes: 7 additions & 7 deletions shotover/src/transforms/query_counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::transforms::TransformConfig;
use crate::transforms::{Transform, TransformBuilder, Wrapper};
use anyhow::Result;
use async_trait::async_trait;
use metrics::{counter, register_counter};
use metrics::counter;
use serde::Deserialize;
use serde::Serialize;

Expand All @@ -21,7 +21,7 @@ pub struct QueryCounterConfig {

impl QueryCounter {
pub fn new(counter_name: String) -> Self {
register_counter!("shotover_query_count", "name" => counter_name.clone());
counter!("shotover_query_count", "name" => counter_name.clone());

QueryCounter { counter_name }
}
Expand Down Expand Up @@ -49,20 +49,20 @@ impl Transform for QueryCounter {
#[cfg(feature = "cassandra")]
Some(Frame::Cassandra(frame)) => {
for statement in frame.operation.queries() {
counter!("shotover_query_count", 1, "name" => self.counter_name.clone(), "query" => statement.short_name(), "type" => "cassandra");
counter!("shotover_query_count", "name" => self.counter_name.clone(), "query" => statement.short_name(), "type" => "cassandra").increment(1);
}
}
#[cfg(feature = "redis")]
Some(Frame::Redis(frame)) => {
if let Some(query_type) = crate::frame::redis::redis_query_name(frame) {
counter!("shotover_query_count", 1, "name" => self.counter_name.clone(), "query" => query_type, "type" => "redis");
counter!("shotover_query_count", "name" => self.counter_name.clone(), "query" => query_type, "type" => "redis").increment(1);
} else {
counter!("shotover_query_count", 1, "name" => self.counter_name.clone(), "query" => "unknown", "type" => "redis");
counter!("shotover_query_count", "name" => self.counter_name.clone(), "query" => "unknown", "type" => "redis").increment(1);
}
}
#[cfg(feature = "kafka")]
Some(Frame::Kafka(_)) => {
counter!("shotover_query_count", 1, "name" => self.counter_name.clone(), "query" => "unknown", "type" => "kafka");
counter!("shotover_query_count", "name" => self.counter_name.clone(), "query" => "unknown", "type" => "kafka").increment(1);
}
Some(Frame::Dummy) => {
// Dummy does not count as a message
Expand All @@ -72,7 +72,7 @@ impl Transform for QueryCounter {
todo!();
}
None => {
counter!("shotover_query_count", 1, "name" => self.counter_name.clone(), "query" => "unknown", "type" => "none")
counter!("shotover_query_count", "name" => self.counter_name.clone(), "query" => "unknown", "type" => "none").increment(1)
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions shotover/src/transforms/redis/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use cql3_parser::cassandra_statement::CassandraStatement;
use cql3_parser::common::{FQName, Identifier, Operand, RelationElement, RelationOperator};
use cql3_parser::select::Select;
use itertools::Itertools;
use metrics::{register_counter, Counter};
use metrics::{counter, Counter};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap};
use std::net::SocketAddr;
Expand Down Expand Up @@ -86,7 +86,7 @@ const NAME: &str = "RedisCache";
#[async_trait(?Send)]
impl TransformConfig for RedisConfig {
async fn get_builder(&self, _chain_name: String) -> Result<Box<dyn TransformBuilder>> {
let missed_requests = register_counter!("shotover_cache_miss_count");
let missed_requests = counter!("shotover_cache_miss_count");

let caching_schema: HashMap<FQName, TableCacheSchema> = self
.caching_schema
Expand Down Expand Up @@ -611,7 +611,7 @@ mod test {
use crate::transforms::TransformBuilder;
use bytes::Bytes;
use cql3_parser::common::Identifier;
use metrics::register_counter;
use metrics::counter;
use std::collections::HashMap;

#[test]
Expand Down Expand Up @@ -811,7 +811,7 @@ mod test {
let transform = SimpleRedisCacheBuilder {
cache_chain: TransformChainBuilder::new(vec![], "test-chain"),
caching_schema: HashMap::new(),
missed_requests: register_counter!("cache_miss"),
missed_requests: counter!("cache_miss"),
};

assert_eq!(
Expand All @@ -838,7 +838,7 @@ mod test {
let transform = SimpleRedisCacheBuilder {
cache_chain,
caching_schema: HashMap::new(),
missed_requests: register_counter!("cache_miss"),
missed_requests: counter!("cache_miss"),
};

assert_eq!(transform.validate(), Vec::<String>::new());
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/transforms/redis/sink_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use futures::stream::FuturesOrdered;
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryFutureExt};
use itertools::Itertools;
use metrics::{counter, register_counter};
use metrics::counter;
use rand::rngs::SmallRng;
use rand::seq::IteratorRandom;
use rand::SeedableRng;
Expand Down Expand Up @@ -159,7 +159,7 @@ impl RedisSinkCluster {
token: None,
};

register_counter!("shotover_failed_requests_count", "chain" => chain_name, "transform" => sink_cluster.get_name());
counter!("shotover_failed_requests_count", "chain" => chain_name, "transform" => sink_cluster.get_name());

sink_cluster
}
Expand Down Expand Up @@ -604,7 +604,7 @@ impl RedisSinkCluster {

#[inline(always)]
fn send_error_response(&self, message: &str) -> Result<ResponseFuture> {
counter!("shotover_failed_requests_count", 1, "chain" => self.chain_name.clone(), "transform" => self.get_name());
counter!("shotover_failed_requests_count", "chain" => self.chain_name.clone(), "transform" => self.get_name()).increment(1);
short_circuit(RedisFrame::Error(message.into()))
}

Expand Down
4 changes: 2 additions & 2 deletions shotover/src/transforms/redis/sink_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::transforms::{Transform, TransformBuilder, TransformConfig, Wrapper};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use futures::{FutureExt, SinkExt, StreamExt};
use metrics::{register_counter, Counter};
use metrics::{counter, Counter};
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::pin::Pin;
Expand Down Expand Up @@ -59,7 +59,7 @@ impl RedisSinkSingleBuilder {
chain_name: String,
connect_timeout_ms: u64,
) -> Self {
let failed_requests = register_counter!("shotover_failed_requests_count", "chain" => chain_name, "transform" => "RedisSinkSingle");
let failed_requests = counter!("shotover_failed_requests_count", "chain" => chain_name, "transform" => "RedisSinkSingle");
let connect_timeout = Duration::from_millis(connect_timeout_ms);

RedisSinkSingleBuilder {
Expand Down
5 changes: 2 additions & 3 deletions shotover/src/transforms/tee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use hyper::{
service::{make_service_fn, service_fn},
Method, Request, StatusCode, {Body, Response, Server},
};
use metrics::{register_counter, Counter};
use metrics::{counter, Counter};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::atomic::Ordering;
Expand Down Expand Up @@ -49,8 +49,7 @@ impl TeeBuilder {
tokio::spawn(chain_switch_listener.async_run(result_source.clone()));
}

let dropped_messages =
register_counter!("shotover_tee_dropped_messages_count", "chain" => "Tee");
let dropped_messages = counter!("shotover_tee_dropped_messages_count", "chain" => "Tee");

TeeBuilder {
tx,
Expand Down

0 comments on commit c992dc4

Please sign in to comment.