diff --git a/CHANGELOG.md b/CHANGELOG.md index aefdae2b33..67b40f9c0f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ - Add quotas to global config. ([#3086](https://github.com/getsentry/relay/pull/3086)) - Adds support for dynamic metric bucket encoding. ([#3137](https://github.com/getsentry/relay/pull/3137)) +- Use statsdproxy to pre-aggregate metrics. ([#2425](https://github.com/getsentry/relay/pull/2425)) ## 24.2.0 diff --git a/Cargo.lock b/Cargo.lock index 97223f2673..efbdb964ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1320,6 +1320,19 @@ dependencies = [ "syn 2.0.38", ] +[[package]] +name = "env_logger" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd405aab171cb85d6735e5c8d9db038c17d3ca007a4d2c25f337935c3d90580" +dependencies = [ + "humantime", + "is-terminal", + "log", + "regex", + "termcolor", +] + [[package]] name = "equivalent" version = "1.0.0" @@ -2064,6 +2077,12 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62eef4964b4e1c2d66981a5646d893768fd15d96957aae5e0e85c632503e9724" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.14.26" @@ -4163,6 +4182,7 @@ dependencies = [ "parking_lot", "rand", "relay-log", + "statsdproxy", ] [[package]] @@ -5205,6 +5225,21 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "statsdproxy" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793ec303cef342e5a80e717016050673430851d5e3159ba8275c0f801cc83c11" +dependencies = [ + "anyhow", + "cadence", + "crc32fast", + "env_logger", + "log", + "rand", + "thread_local", +] + [[package]] name = "string_cache" version = "0.8.4" diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 6e42e9b213..a09f8d1c29 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -497,6 +497,10 @@ struct Metrics { /// /// Defaults to `true`. buffering: bool, + /// Emitted metrics will be aggregated to optimize bandwidth. + /// + /// Defaults to `true`. + aggregation: bool, /// Global sample rate for all emitted metrics between `0.0` and `1.0`. /// /// For example, a value of `0.3` means that only 30% of the emitted metrics will be sent. @@ -512,6 +516,7 @@ impl Default for Metrics { default_tags: BTreeMap::new(), hostname_tag: None, buffering: true, + aggregation: true, sample_rate: 1.0, } } @@ -1823,6 +1828,11 @@ impl Config { self.values.metrics.buffering } + /// Returns true if metrics aggregation is enabled, false otherwise. + pub fn metrics_aggregation(&self) -> bool { + self.values.metrics.aggregation + } + /// Returns the global sample rate for all metrics. pub fn metrics_sample_rate(&self) -> f32 { self.values.metrics.sample_rate diff --git a/relay-statsd/Cargo.toml b/relay-statsd/Cargo.toml index 26fe913d26..f76d3004b6 100644 --- a/relay-statsd/Cargo.toml +++ b/relay-statsd/Cargo.toml @@ -15,6 +15,7 @@ crossbeam-channel = "0.5.6" parking_lot = { workspace = true } rand = { workspace = true } relay-log = { path = "../relay-log" } +statsdproxy = { version = "0.1.2", features = ["cadence-adapter"], default-features = false } [features] default = [] diff --git a/relay-statsd/src/lib.rs b/relay-statsd/src/lib.rs index f3da2dde40..b16e548f5f 100644 --- a/relay-statsd/src/lib.rs +++ b/relay-statsd/src/lib.rs @@ -23,7 +23,7 @@ //! ```no_run //! # use std::collections::BTreeMap; //! -//! relay_statsd::init("myprefix", "localhost:8125", BTreeMap::new(), true, 1.0); +//! relay_statsd::init("myprefix", "localhost:8125", BTreeMap::new(), true, true, 1.0); //! ``` //! //! ## Macro Usage @@ -66,6 +66,8 @@ use cadence::{ }; use parking_lot::RwLock; use rand::distributions::{Distribution, Uniform}; +use statsdproxy::cadence::StatsdProxyMetricSink; +use statsdproxy::config::AggregateMetricsConfig; /// Maximum number of metric events that can be queued before we start dropping them const METRICS_MAX_QUEUE_SIZE: usize = 100_000; @@ -226,6 +228,7 @@ pub fn init( host: A, default_tags: BTreeMap, buffering: bool, + aggregating: bool, sample_rate: f32, ) { let addrs: Vec<_> = host.to_socket_addrs().unwrap().collect(); @@ -247,7 +250,24 @@ pub fn init( let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); socket.set_nonblocking(true).unwrap(); - let statsd_client = if buffering { + let statsd_client = if aggregating { + let host = host.to_socket_addrs().unwrap().next().unwrap(); + let statsdproxy_sink = StatsdProxyMetricSink::new(move || { + let next_step = statsdproxy::middleware::upstream::Upstream::new(host) + .expect("failed to create statsdproxy metric sink"); + statsdproxy::middleware::aggregate::AggregateMetrics::new( + AggregateMetricsConfig { + aggregate_gauges: true, + aggregate_counters: true, + flush_interval: 1, + flush_offset: 0, + max_map_size: None, + }, + next_step, + ) + }); + StatsdClient::from_sink(prefix, statsdproxy_sink) + } else if buffering { let udp_sink = BufferedUdpMetricSink::from(host, socket).unwrap(); let queuing_sink = QueuingMetricSink::with_capacity(udp_sink, METRICS_MAX_QUEUE_SIZE); StatsdClient::from_sink(prefix, queuing_sink) diff --git a/relay/src/setup.rs b/relay/src/setup.rs index 109f9837eb..5c3de39913 100644 --- a/relay/src/setup.rs +++ b/relay/src/setup.rs @@ -69,6 +69,7 @@ pub fn init_metrics(config: &Config) -> Result<()> { &addrs[..], default_tags, config.metrics_buffering(), + config.metrics_aggregation(), config.metrics_sample_rate(), );