diff --git a/src/sinks/datadog/metrics/encoder.rs b/src/sinks/datadog/metrics/encoder.rs index b93a19a008476..1221fb7b688cf 100644 --- a/src/sinks/datadog/metrics/encoder.rs +++ b/src/sinks/datadog/metrics/encoder.rs @@ -609,42 +609,46 @@ fn series_to_proto_message( let timestamp = encode_timestamp(metric.timestamp()); - let (points, metric_type, interval) = match (metric.value(), metric.interval_ms()) { - (MetricValue::Counter { value }, maybe_interval_ms) => { - let (value, interval, metric_type) = match maybe_interval_ms { - None => (*value, 0, ddmetric_proto::metric_payload::MetricType::Count), + // our internal representation is in milliseconds but the expected output is in seconds + let maybe_interval = metric.interval_ms().map(|i| i.get() / 1000); + + let (points, metric_type) = match metric.value() { + MetricValue::Counter { value } => { + if let Some(interval) = maybe_interval { // When an interval is defined, it implies the value should be in a per-second form, // so we need to get back to seconds from our milliseconds-based interval, and then // divide our value by that amount as well. - Some(interval_ms) => ( - (*value) * 1000.0 / (interval_ms.get() as f64), - interval_ms.get() as i64 / 1000, + let value = *value / (interval as f64); + ( + vec![ddmetric_proto::metric_payload::MetricPoint { value, timestamp }], ddmetric_proto::metric_payload::MetricType::Rate, - ), - }; - let points = vec![ddmetric_proto::metric_payload::MetricPoint { value, timestamp }]; - (points, metric_type, interval) + ) + } else { + ( + vec![ddmetric_proto::metric_payload::MetricPoint { + value: *value, + timestamp, + }], + ddmetric_proto::metric_payload::MetricType::Count, + ) + } } - (MetricValue::Set { values }, _) => { - let points = vec![ddmetric_proto::metric_payload::MetricPoint { + MetricValue::Set { values } => ( + vec![ddmetric_proto::metric_payload::MetricPoint { value: values.len() as f64, timestamp, - }]; - let metric_type = ddmetric_proto::metric_payload::MetricType::Gauge; - let interval = 0; - (points, metric_type, interval) - } - (MetricValue::Gauge { value }, _) => { - let points = vec![ddmetric_proto::metric_payload::MetricPoint { + }], + ddmetric_proto::metric_payload::MetricType::Gauge, + ), + MetricValue::Gauge { value } => ( + vec![ddmetric_proto::metric_payload::MetricPoint { value: *value, timestamp, - }]; - let metric_type = ddmetric_proto::metric_payload::MetricType::Gauge; - let interval = 0; - (points, metric_type, interval) - } + }], + ddmetric_proto::metric_payload::MetricType::Gauge, + ), // NOTE: AggregatedSummary will have been previously split into counters and gauges during normalization - (value, _) => { + value => { // this case should have already been surfaced by encode_single_metric() so this should never be reached return Err(EncoderError::InvalidMetric { expected: "series", @@ -662,7 +666,7 @@ fn series_to_proto_message( // unit is omitted unit: "".to_string(), source_type_name, - interval, + interval: maybe_interval.unwrap_or(0) as i64, metadata, }) } @@ -822,6 +826,9 @@ fn generate_series_metrics( let ts = encode_timestamp(metric.timestamp()); let tags = Some(encode_tags(&tags)); + // our internal representation is in milliseconds but the expected output is in seconds + let maybe_interval = metric.interval_ms().map(|i| i.get() / 1000); + let event_metadata = metric.metadata(); let metadata = generate_series_metadata( event_metadata.datadog_origin_metadata(), @@ -831,56 +838,25 @@ fn generate_series_metrics( trace!(?metadata, "Generated series metadata."); - let results = match (metric.value(), metric.interval_ms()) { - (MetricValue::Counter { value }, maybe_interval_ms) => { - let (value, interval, metric_type) = match maybe_interval_ms { - None => (*value, None, DatadogMetricType::Count), + let (points, metric_type) = match metric.value() { + MetricValue::Counter { value } => { + if let Some(interval) = maybe_interval { // When an interval is defined, it implies the value should be in a per-second form, // so we need to get back to seconds from our milliseconds-based interval, and then // divide our value by that amount as well. - Some(interval_ms) => ( - (*value) * 1000.0 / (interval_ms.get() as f64), - Some(interval_ms.get() / 1000), - DatadogMetricType::Rate, - ), - }; - - vec![DatadogSeriesMetric { - metric: name, - r#type: metric_type, - interval, - points: vec![DatadogPoint(ts, value)], - tags, - host, - source_type_name, - device, - metadata, - }] + let value = *value / (interval as f64); + (vec![DatadogPoint(ts, value)], DatadogMetricType::Rate) + } else { + (vec![DatadogPoint(ts, *value)], DatadogMetricType::Count) + } } - (MetricValue::Set { values }, _) => vec![DatadogSeriesMetric { - metric: name, - r#type: DatadogMetricType::Gauge, - interval: None, - points: vec![DatadogPoint(ts, values.len() as f64)], - tags, - host, - source_type_name, - device, - metadata, - }], - (MetricValue::Gauge { value }, _) => vec![DatadogSeriesMetric { - metric: name, - r#type: DatadogMetricType::Gauge, - interval: None, - points: vec![DatadogPoint(ts, *value)], - tags, - host, - source_type_name, - device, - metadata, - }], + MetricValue::Set { values } => ( + vec![DatadogPoint(ts, values.len() as f64)], + DatadogMetricType::Gauge, + ), + MetricValue::Gauge { value } => (vec![DatadogPoint(ts, *value)], DatadogMetricType::Gauge), // NOTE: AggregatedSummary will have been previously split into counters and gauges during normalization - (value, _) => { + value => { return Err(EncoderError::InvalidMetric { expected: "series", metric_value: value.as_name(), @@ -888,7 +864,17 @@ fn generate_series_metrics( } }; - Ok(results) + Ok(vec![DatadogSeriesMetric { + metric: name, + r#type: metric_type, + interval: maybe_interval, + points, + tags, + host, + source_type_name, + device, + metadata, + }]) } fn get_compressor() -> Compressor { @@ -1244,6 +1230,63 @@ mod tests { } } + #[test] + fn encode_non_rate_metric_with_interval() { + // It is possible that the Agent sends Gauges with an interval set. This + // Occurs when the origin of the metric is Dogstatsd, where the interval + // is set to 10. + + let value = 423.1331; + let interval_ms = 10000; + + let gauge = Metric::new( + "basic_gauge", + MetricKind::Incremental, + MetricValue::Gauge { value }, + ) + .with_timestamp(Some(ts())) + .with_interval_ms(NonZeroU32::new(interval_ms)); + + let expected_value = value; // For gauge, the value should not be modified by interval + let expected_interval = interval_ms / 1000; + + // series v1 + { + // Encode the metric and make sure we did the rate conversion correctly. + let result = generate_series_metrics( + &gauge, + &None, + log_schema(), + DEFAULT_DD_ORIGIN_PRODUCT_VALUE, + ); + assert!(result.is_ok()); + + let metrics = result.unwrap(); + assert_eq!(metrics.len(), 1); + + let actual = &metrics[0]; + assert_eq!(actual.r#type, DatadogMetricType::Gauge); + assert_eq!(actual.interval, Some(expected_interval)); + assert_eq!(actual.points.len(), 1); + assert_eq!(actual.points[0].1, expected_value); + } + + // series v2 + { + let series_proto = series_to_proto_message( + &gauge, + &None, + log_schema(), + DEFAULT_DD_ORIGIN_PRODUCT_VALUE, + ) + .unwrap(); + assert_eq!(series_proto.r#type, 3); + assert_eq!(series_proto.interval, expected_interval as i64); + assert_eq!(series_proto.points.len(), 1); + assert_eq!(series_proto.points[0].value, expected_value); + } + } + #[test] fn encode_origin_metadata_pass_through() { let product = 10; diff --git a/src/sinks/datadog/metrics/integration_tests.rs b/src/sinks/datadog/metrics/integration_tests.rs index 8c993b22bca04..b8e21cc484d88 100644 --- a/src/sinks/datadog/metrics/integration_tests.rs +++ b/src/sinks/datadog/metrics/integration_tests.rs @@ -72,11 +72,15 @@ fn generate_counter_gauge_set() -> Vec { let ts = Utc::now().trunc_subsecs(3); let events = vec![ // gauge - Event::Metric(Metric::new( - "gauge", - MetricKind::Incremental, - MetricValue::Gauge { value: 5678.0 }, - )), + Event::Metric( + Metric::new( + "gauge", + MetricKind::Incremental, + MetricValue::Gauge { value: 5678.0 }, + ) + // Dogstatsd outputs gauges with an interval + .with_interval_ms(NonZeroU32::new(10000)), + ), // counter with interval Event::Metric( Metric::new( @@ -318,7 +322,7 @@ fn validate_protobuf_set_gauge_rate(request: &(Parts, Bytes)) { ddmetric_proto::metric_payload::MetricType::Gauge ); assert_eq!(gauge.points[0].value, 5678.0); - assert_eq!(gauge.interval, 0); + assert_eq!(gauge.interval, 10); } // validate counter w interval = rate diff --git a/src/sources/datadog_agent/metrics.rs b/src/sources/datadog_agent/metrics.rs index ee1c998c13ced..c6a7955f54d59 100644 --- a/src/sources/datadog_agent/metrics.rs +++ b/src/sources/datadog_agent/metrics.rs @@ -266,6 +266,33 @@ pub(crate) fn decode_ddseries_v2( let event_metadata = get_event_metadata(serie.metadata.as_ref()); + // It is possible to receive non-rate metrics from the Agent with an interval set. + // That interval can be applied with the `as_rate` function in the Datadog UI. + // The scenario this happens is when DogStatsD emits non-rate series metrics to the Agent, + // in which it sets an interval to 10. See + // - https://github.com/DataDog/datadog-agent/blob/9f0a85c926596ec9aebe2d8e1f2a8b1af6e45635/pkg/aggregator/time_sampler.go#L49C1-L49C1 + // - https://github.com/DataDog/datadog-agent/blob/209b70529caff9ec1c30b6b2eed27bce725ed153/pkg/aggregator/aggregator.go#L39 + // + // Note that DogStatsD is the only scenario this occurs; regular Agent checks/services do not set the + // interval for non-rate series metrics. + // + // Note that because Vector does not yet have a specific Metric type to handle Rate, + // we are distinguishing Rate from Count by setting an interval to Rate but not Count. + // Luckily, the only time a Count metric type is emitted by DogStatsD, is in the Sketch endpoint. + // (Regular Count metrics are emitted by DogStatsD as Rate metrics). + // + // In theory we should be safe to set this non-rate-interval to Count metrics below, but to be safe, + // we will only set it for Rate and Gauge. Since Rates already need an interval, the only "odd" case + // is Gauges. + // + // Ultimately if we had a unique internal representation of a Rate metric type, we wouldn't need to + // have special handling for the interval, we would just apply it to all metrics that it came in with. + let non_rate_interval = if serie.interval.is_positive() { + NonZeroU32::new(serie.interval as u32 * 1000) // incoming is seconds, convert to milliseconds + } else { + None + }; + serie.resources.into_iter().for_each(|r| { // As per https://github.com/DataDog/datadog-agent/blob/a62ac9fb13e1e5060b89e731b8355b2b20a07c5b/pkg/serializer/internal/metrics/iterable_series.go#L180-L189 // the hostname can be found in MetricSeries::resources and that is the only value stored there. @@ -323,6 +350,7 @@ pub(crate) fn decode_ddseries_v2( )) .with_tags(Some(tags.clone())) .with_namespace(namespace) + .with_interval_ms(non_rate_interval) }) .collect::>(), Ok(metric_payload::MetricType::Rate) => serie diff --git a/src/sources/datadog_agent/tests.rs b/src/sources/datadog_agent/tests.rs index c6a6fe6341218..a4d779ccb9975 100644 --- a/src/sources/datadog_agent/tests.rs +++ b/src/sources/datadog_agent/tests.rs @@ -1902,7 +1902,7 @@ async fn decode_series_endpoint_v2() { r#type: ddmetric_proto::metric_payload::MetricType::Gauge as i32, unit: "".to_string(), source_type_name: "a_random_source_type_name".to_string(), - interval: 0, + interval: 10, // Dogstatsd sets Gauge interval to 10 by default metadata: None, }, ddmetric_proto::metric_payload::MetricSeries { @@ -1982,6 +1982,13 @@ async fn decode_series_endpoint_v2() { ) ); assert_eq!(metric.kind(), MetricKind::Absolute); + assert_eq!( + metric + .interval_ms() + .expect("should have set interval") + .get(), + 10000 + ); assert_eq!(*metric.value(), MetricValue::Gauge { value: 3.14 }); assert_tags( metric, @@ -2006,6 +2013,13 @@ async fn decode_series_endpoint_v2() { ); assert_eq!(metric.kind(), MetricKind::Absolute); assert_eq!(*metric.value(), MetricValue::Gauge { value: 3.1415 }); + assert_eq!( + metric + .interval_ms() + .expect("should have set interval") + .get(), + 10000 + ); assert_tags( metric, metric_tags!(