diff --git a/metrics-exporter-dogstatsd/src/builder.rs b/metrics-exporter-dogstatsd/src/builder.rs index a922df54..b1f0a254 100644 --- a/metrics-exporter-dogstatsd/src/builder.rs +++ b/metrics-exporter-dogstatsd/src/builder.rs @@ -1,6 +1,7 @@ -use std::{net::SocketAddr, sync::Arc, time::Duration}; +use std::{fmt, net::SocketAddr, sync::Arc, time::Duration}; use thiserror::Error; +use tracing::debug; use crate::{ forwarder::{self, ForwarderConfiguration, RemoteAddr}, @@ -10,13 +11,13 @@ use crate::{ // Maximum data length for a UDP datagram. // -// Realistically, users should basically never send payloads anywhere _near_ this large, but we're only trying to ensure -// we're not about to do anything that we _know_ is technically invalid. +// Realistically, users should never send payloads anywhere _near_ this large, but we're only trying to ensure we're not +// about to do anything that we _know_ is technically invalid. const UDP_DATAGRAM_MAX_PAYLOAD_LEN: usize = (u16::MAX as usize) - 8; const DEFAULT_WRITE_TIMEOUT: Duration = Duration::from_secs(1); -const DEFAULT_MAX_PAYLOAD_LEN: usize = 8192; -const DEFAULT_FLUSH_INTERVAL: Duration = Duration::from_secs(3); +const DEFAULT_FLUSH_INTERVAL_CONSERVATIVE: Duration = Duration::from_secs(3); +const DEFAULT_FLUSH_INTERVAL_AGGRESSIVE: Duration = Duration::from_secs(10); const DEFAULT_HISTOGRAM_RESERVOIR_SIZE: usize = 1024; /// Errors that could occur while building or installing a DogStatsD recorder/exporter. @@ -65,13 +66,31 @@ pub enum AggregationMode { Aggressive, } +impl AggregationMode { + fn default_flush_interval(&self) -> Duration { + match self { + AggregationMode::Conservative => DEFAULT_FLUSH_INTERVAL_CONSERVATIVE, + AggregationMode::Aggressive => DEFAULT_FLUSH_INTERVAL_AGGRESSIVE, + } + } +} + +impl fmt::Display for AggregationMode { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AggregationMode::Conservative => write!(f, "conservative"), + AggregationMode::Aggressive => write!(f, "aggressive"), + } + } +} + /// Builder for a DogStatsD exporter. #[derive(Debug)] pub struct DogStatsDBuilder { remote_addr: RemoteAddr, write_timeout: Duration, - max_payload_len: usize, - flush_interval: Duration, + max_payload_len: Option, + flush_interval: Option, synchronous: bool, agg_mode: AggregationMode, telemetry: bool, @@ -81,20 +100,30 @@ pub struct DogStatsDBuilder { } impl DogStatsDBuilder { + fn get_max_payload_len(&self) -> usize { + self.max_payload_len.unwrap_or_else(|| self.remote_addr.default_max_payload_len()) + } + + fn get_flush_interval(&self) -> Duration { + self.flush_interval.unwrap_or_else(|| self.agg_mode.default_flush_interval()) + } + fn validate_max_payload_len(&self) -> Result<(), BuildError> { + let max_payload_len = self.get_max_payload_len(); + if let RemoteAddr::Udp(_) = &self.remote_addr { - if self.max_payload_len > UDP_DATAGRAM_MAX_PAYLOAD_LEN { + if max_payload_len > UDP_DATAGRAM_MAX_PAYLOAD_LEN { return Err(BuildError::InvalidConfiguration { - reason: format!("maximum payload length ({} bytes) exceeds UDP datagram maximum length ({} bytes)", self.max_payload_len, UDP_DATAGRAM_MAX_PAYLOAD_LEN), + reason: format!("maximum payload length ({} bytes) exceeds UDP datagram maximum length ({} bytes)", max_payload_len, UDP_DATAGRAM_MAX_PAYLOAD_LEN), }); } } - if self.max_payload_len > u32::MAX as usize { + if max_payload_len > u32::MAX as usize { return Err(BuildError::InvalidConfiguration { reason: format!( "maximum payload length ({} bytes) exceeds theoretical upper bound ({} bytes)", - self.max_payload_len, + max_payload_len, u32::MAX ), }); @@ -146,7 +175,7 @@ impl DogStatsDBuilder { /// Setting a higher value is likely to lead to invalid metric payloads that are discarded by the Datadog Agent when /// received. /// - /// Defaults to 8192 bytes. + /// Defaults to 1432 bytes for UDP, and 8192 bytes for Unix domain sockets. /// /// # Errors /// @@ -155,7 +184,7 @@ impl DogStatsDBuilder { mut self, max_payload_len: usize, ) -> Result { - self.max_payload_len = max_payload_len; + self.max_payload_len = Some(max_payload_len); self.validate_max_payload_len()?; Ok(self) @@ -193,10 +222,10 @@ impl DogStatsDBuilder { /// aggregation. A shorter interval will provide more frequent updates to the remote server, but will result in more /// network traffic and processing overhead. /// - /// Defaults to 3 seconds. + /// Defaults to 3 seconds in conservative mode, and 10 seconds in aggressive mode. #[must_use] pub fn with_flush_interval(mut self, flush_interval: Duration) -> Self { - self.flush_interval = flush_interval; + self.flush_interval = Some(flush_interval); self } @@ -276,6 +305,16 @@ impl DogStatsDBuilder { pub fn build(self) -> Result { self.validate_max_payload_len()?; + let max_payload_len = self.get_max_payload_len(); + let flush_interval = self.get_flush_interval(); + + debug!( + agg_mode = %self.agg_mode, + histogram_sampling = self.histogram_sampling, + histogram_reservoir_size = self.histogram_reservoir_size, + histograms_as_distributions = self.histograms_as_distributions, + "Building DogStatsD exporter." + ); let state_config = StateConfiguration { agg_mode: self.agg_mode, telemetry: self.telemetry, @@ -283,18 +322,28 @@ impl DogStatsDBuilder { histogram_reservoir_size: self.histogram_reservoir_size, histograms_as_distributions: self.histograms_as_distributions, }; + let state = Arc::new(State::new(state_config)); let recorder = DogStatsDRecorder::new(Arc::clone(&state)); + debug!( + remote_addr = %self.remote_addr, + max_payload_len, + ?flush_interval, + write_timeout = ?self.write_timeout, + "Building DogStatsD forwarder." + ); let forwarder_config = ForwarderConfiguration { remote_addr: self.remote_addr, - max_payload_len: self.max_payload_len, - flush_interval: self.flush_interval, + max_payload_len, + flush_interval, write_timeout: self.write_timeout, }; if self.synchronous { + debug!("Spawning synchronous forwarder backend."); + let forwarder = forwarder::sync::Forwarder::new(forwarder_config, state); std::thread::Builder::new() @@ -330,8 +379,8 @@ impl Default for DogStatsDBuilder { DogStatsDBuilder { remote_addr: RemoteAddr::Udp(vec![SocketAddr::from(([127, 0, 0, 1], 8125))]), write_timeout: DEFAULT_WRITE_TIMEOUT, - max_payload_len: DEFAULT_MAX_PAYLOAD_LEN, - flush_interval: DEFAULT_FLUSH_INTERVAL, + max_payload_len: None, + flush_interval: None, synchronous: true, agg_mode: AggregationMode::Conservative, telemetry: true, @@ -346,6 +395,31 @@ impl Default for DogStatsDBuilder { mod tests { use super::*; + #[test] + fn default_flush_interval_agg_mode() { + let builder = + DogStatsDBuilder::default().with_aggregation_mode(AggregationMode::Conservative); + assert_eq!(builder.get_flush_interval(), DEFAULT_FLUSH_INTERVAL_CONSERVATIVE); + + let builder = + DogStatsDBuilder::default().with_aggregation_mode(AggregationMode::Aggressive); + assert_eq!(builder.get_flush_interval(), DEFAULT_FLUSH_INTERVAL_AGGRESSIVE); + + let custom_flush_interval = Duration::from_millis(123456789); + let builder = DogStatsDBuilder::default().with_flush_interval(custom_flush_interval); + assert_eq!(builder.get_flush_interval(), custom_flush_interval); + } + + #[test] + fn default_max_payload_len_udp() { + let builder = DogStatsDBuilder::default() + .with_remote_address("127.0.0.1:9999") + .expect("address should be valid"); + + assert_eq!(builder.get_max_payload_len(), 1432); + assert!(builder.build().is_ok()); + } + #[test] fn max_payload_len_exceeds_udp_max_len() { let builder = @@ -367,6 +441,23 @@ mod tests { mod linux { use super::*; + #[test] + fn default_max_payload_len_uds() { + let builder = DogStatsDBuilder::default() + .with_remote_address("unix:///tmp/dogstatsd.sock") + .expect("address should be valid"); + + assert_eq!(builder.get_max_payload_len(), 8192); + assert!(builder.build().is_ok()); + + let builder = DogStatsDBuilder::default() + .with_remote_address("unixgram:///tmp/dogstatsd.sock") + .expect("address should be valid"); + + assert_eq!(builder.get_max_payload_len(), 8192); + assert!(builder.build().is_ok()); + } + #[test] fn max_payload_len_exceeds_udp_max_len_transport_change() { let builder = DogStatsDBuilder::default() diff --git a/metrics-exporter-dogstatsd/src/forwarder/mod.rs b/metrics-exporter-dogstatsd/src/forwarder/mod.rs index 4b4cdeed..6cb1a4af 100644 --- a/metrics-exporter-dogstatsd/src/forwarder/mod.rs +++ b/metrics-exporter-dogstatsd/src/forwarder/mod.rs @@ -1,6 +1,7 @@ #[cfg(target_os = "linux")] use std::path::PathBuf; use std::{ + fmt, net::{SocketAddr, ToSocketAddrs as _}, time::Duration, }; @@ -32,6 +33,14 @@ impl RemoteAddr { RemoteAddr::Unixgram(_) => "uds", } } + + pub(crate) fn default_max_payload_len(&self) -> usize { + match self { + RemoteAddr::Udp(_) => 1432, + #[cfg(target_os = "linux")] + RemoteAddr::Unix(_) | RemoteAddr::Unixgram(_) => 8192, + } + } } impl<'a> TryFrom<&'a str> for RemoteAddr { @@ -61,6 +70,33 @@ impl<'a> TryFrom<&'a str> for RemoteAddr { } } +impl fmt::Display for RemoteAddr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + RemoteAddr::Udp(addrs) => { + if addrs.len() == 1 { + write!(f, "udp://{}", addrs[0]) + } else { + write!(f, "udp://[")?; + + for (idx, addr) in addrs.iter().enumerate() { + if idx == 0 { + write!(f, "{}", addr)?; + } else { + write!(f, ",{}", addr)?; + } + } + write!(f, "]") + } + } + #[cfg(target_os = "linux")] + RemoteAddr::Unix(path) | RemoteAddr::Unixgram(path) => { + write!(f, "unixgram://{}", path.display()) + } + } + } +} + fn unknown_scheme_error_str(scheme: &str) -> String { format!("invalid scheme '{scheme}' (expected 'udp', 'unix', or 'unixgram')") }