Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(datadog_agent source, datadog_metrics sink): handle interval for non-rate series metrics #18889

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 116 additions & 73 deletions src/sinks/datadog/metrics/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,42 +609,46 @@ fn series_to_proto_message(

let timestamp = encode_timestamp(metric.timestamp());

let (points, metric_type, interval) = match (metric.value(), metric.interval_ms()) {
(MetricValue::Counter { value }, maybe_interval_ms) => {
let (value, interval, metric_type) = match maybe_interval_ms {
None => (*value, 0, ddmetric_proto::metric_payload::MetricType::Count),
// our internal representation is in milliseconds but the expected output is in seconds
let maybe_interval = metric.interval_ms().map(|i| i.get() / 1000);

let (points, metric_type) = match metric.value() {
MetricValue::Counter { value } => {
if let Some(interval) = maybe_interval {
// When an interval is defined, it implies the value should be in a per-second form,
// so we need to get back to seconds from our milliseconds-based interval, and then
// divide our value by that amount as well.
Some(interval_ms) => (
(*value) * 1000.0 / (interval_ms.get() as f64),
interval_ms.get() as i64 / 1000,
let value = *value / (interval as f64);
(
vec![ddmetric_proto::metric_payload::MetricPoint { value, timestamp }],
ddmetric_proto::metric_payload::MetricType::Rate,
),
};
let points = vec![ddmetric_proto::metric_payload::MetricPoint { value, timestamp }];
(points, metric_type, interval)
)
} else {
(
vec![ddmetric_proto::metric_payload::MetricPoint {
value: *value,
timestamp,
}],
ddmetric_proto::metric_payload::MetricType::Count,
)
}
}
(MetricValue::Set { values }, _) => {
let points = vec![ddmetric_proto::metric_payload::MetricPoint {
MetricValue::Set { values } => (
vec![ddmetric_proto::metric_payload::MetricPoint {
value: values.len() as f64,
timestamp,
}];
let metric_type = ddmetric_proto::metric_payload::MetricType::Gauge;
let interval = 0;
(points, metric_type, interval)
}
(MetricValue::Gauge { value }, _) => {
let points = vec![ddmetric_proto::metric_payload::MetricPoint {
}],
ddmetric_proto::metric_payload::MetricType::Gauge,
),
MetricValue::Gauge { value } => (
vec![ddmetric_proto::metric_payload::MetricPoint {
value: *value,
timestamp,
}];
let metric_type = ddmetric_proto::metric_payload::MetricType::Gauge;
let interval = 0;
(points, metric_type, interval)
}
}],
ddmetric_proto::metric_payload::MetricType::Gauge,
),
// NOTE: AggregatedSummary will have been previously split into counters and gauges during normalization
(value, _) => {
value => {
// this case should have already been surfaced by encode_single_metric() so this should never be reached
return Err(EncoderError::InvalidMetric {
expected: "series",
Expand All @@ -662,7 +666,7 @@ fn series_to_proto_message(
// unit is omitted
unit: "".to_string(),
source_type_name,
interval,
interval: maybe_interval.unwrap_or(0) as i64,
metadata,
})
}
Expand Down Expand Up @@ -822,6 +826,9 @@ fn generate_series_metrics(
let ts = encode_timestamp(metric.timestamp());
let tags = Some(encode_tags(&tags));

// our internal representation is in milliseconds but the expected output is in seconds
let maybe_interval = metric.interval_ms().map(|i| i.get() / 1000);

let event_metadata = metric.metadata();
let metadata = generate_series_metadata(
event_metadata.datadog_origin_metadata(),
Expand All @@ -831,64 +838,43 @@ fn generate_series_metrics(

trace!(?metadata, "Generated series metadata.");

let results = match (metric.value(), metric.interval_ms()) {
(MetricValue::Counter { value }, maybe_interval_ms) => {
let (value, interval, metric_type) = match maybe_interval_ms {
None => (*value, None, DatadogMetricType::Count),
let (points, metric_type) = match metric.value() {
MetricValue::Counter { value } => {
if let Some(interval) = maybe_interval {
// When an interval is defined, it implies the value should be in a per-second form,
// so we need to get back to seconds from our milliseconds-based interval, and then
// divide our value by that amount as well.
Some(interval_ms) => (
(*value) * 1000.0 / (interval_ms.get() as f64),
Some(interval_ms.get() / 1000),
DatadogMetricType::Rate,
),
};

vec![DatadogSeriesMetric {
metric: name,
r#type: metric_type,
interval,
points: vec![DatadogPoint(ts, value)],
tags,
host,
source_type_name,
device,
metadata,
}]
let value = *value / (interval as f64);
(vec![DatadogPoint(ts, value)], DatadogMetricType::Rate)
} else {
(vec![DatadogPoint(ts, *value)], DatadogMetricType::Count)
}
}
(MetricValue::Set { values }, _) => vec![DatadogSeriesMetric {
metric: name,
r#type: DatadogMetricType::Gauge,
interval: None,
points: vec![DatadogPoint(ts, values.len() as f64)],
tags,
host,
source_type_name,
device,
metadata,
}],
(MetricValue::Gauge { value }, _) => vec![DatadogSeriesMetric {
metric: name,
r#type: DatadogMetricType::Gauge,
interval: None,
points: vec![DatadogPoint(ts, *value)],
tags,
host,
source_type_name,
device,
metadata,
}],
MetricValue::Set { values } => (
vec![DatadogPoint(ts, values.len() as f64)],
DatadogMetricType::Gauge,
),
MetricValue::Gauge { value } => (vec![DatadogPoint(ts, *value)], DatadogMetricType::Gauge),
// NOTE: AggregatedSummary will have been previously split into counters and gauges during normalization
(value, _) => {
value => {
return Err(EncoderError::InvalidMetric {
expected: "series",
metric_value: value.as_name(),
})
}
};

Ok(results)
Ok(vec![DatadogSeriesMetric {
metric: name,
r#type: metric_type,
interval: maybe_interval,
points,
tags,
host,
source_type_name,
device,
metadata,
}])
}

fn get_compressor() -> Compressor {
Expand Down Expand Up @@ -1244,6 +1230,63 @@ mod tests {
}
}

#[test]
fn encode_non_rate_metric_with_interval() {
// It is possible that the Agent sends Gauges with an interval set. This
// Occurs when the origin of the metric is Dogstatsd, where the interval
// is set to 10.

let value = 423.1331;
let interval_ms = 10000;

let gauge = Metric::new(
"basic_gauge",
MetricKind::Incremental,
MetricValue::Gauge { value },
)
.with_timestamp(Some(ts()))
.with_interval_ms(NonZeroU32::new(interval_ms));

let expected_value = value; // For gauge, the value should not be modified by interval
let expected_interval = interval_ms / 1000;

// series v1
{
// Encode the metric and make sure we did the rate conversion correctly.
let result = generate_series_metrics(
&gauge,
&None,
log_schema(),
DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
);
assert!(result.is_ok());

let metrics = result.unwrap();
assert_eq!(metrics.len(), 1);

let actual = &metrics[0];
assert_eq!(actual.r#type, DatadogMetricType::Gauge);
assert_eq!(actual.interval, Some(expected_interval));
assert_eq!(actual.points.len(), 1);
assert_eq!(actual.points[0].1, expected_value);
}

// series v2
{
let series_proto = series_to_proto_message(
&gauge,
&None,
log_schema(),
DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
)
.unwrap();
assert_eq!(series_proto.r#type, 3);
assert_eq!(series_proto.interval, expected_interval as i64);
assert_eq!(series_proto.points.len(), 1);
assert_eq!(series_proto.points[0].value, expected_value);
}
}

#[test]
fn encode_origin_metadata_pass_through() {
let product = 10;
Expand Down
16 changes: 10 additions & 6 deletions src/sinks/datadog/metrics/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,15 @@ fn generate_counter_gauge_set() -> Vec<Event> {
let ts = Utc::now().trunc_subsecs(3);
let events = vec![
// gauge
Event::Metric(Metric::new(
"gauge",
MetricKind::Incremental,
MetricValue::Gauge { value: 5678.0 },
)),
Event::Metric(
Metric::new(
"gauge",
MetricKind::Incremental,
MetricValue::Gauge { value: 5678.0 },
)
// Dogstatsd outputs gauges with an interval
.with_interval_ms(NonZeroU32::new(10000)),
),
// counter with interval
Event::Metric(
Metric::new(
Expand Down Expand Up @@ -318,7 +322,7 @@ fn validate_protobuf_set_gauge_rate(request: &(Parts, Bytes)) {
ddmetric_proto::metric_payload::MetricType::Gauge
);
assert_eq!(gauge.points[0].value, 5678.0);
assert_eq!(gauge.interval, 0);
assert_eq!(gauge.interval, 10);
}

// validate counter w interval = rate
Expand Down
28 changes: 28 additions & 0 deletions src/sources/datadog_agent/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,33 @@ pub(crate) fn decode_ddseries_v2(

let event_metadata = get_event_metadata(serie.metadata.as_ref());

// It is possible to receive non-rate metrics from the Agent with an interval set.
// That interval can be applied with the `as_rate` function in the Datadog UI.
// The scenario this happens is when DogStatsD emits non-rate series metrics to the Agent,
// in which it sets an interval to 10. See
// - https://github.com/DataDog/datadog-agent/blob/9f0a85c926596ec9aebe2d8e1f2a8b1af6e45635/pkg/aggregator/time_sampler.go#L49C1-L49C1
// - https://github.com/DataDog/datadog-agent/blob/209b70529caff9ec1c30b6b2eed27bce725ed153/pkg/aggregator/aggregator.go#L39
//
// Note that DogStatsD is the only scenario this occurs; regular Agent checks/services do not set the
// interval for non-rate series metrics.
//
// Note that because Vector does not yet have a specific Metric type to handle Rate,
// we are distinguishing Rate from Count by setting an interval to Rate but not Count.
// Luckily, the only time a Count metric type is emitted by DogStatsD, is in the Sketch endpoint.
// (Regular Count metrics are emitted by DogStatsD as Rate metrics).
//
// In theory we should be safe to set this non-rate-interval to Count metrics below, but to be safe,
// we will only set it for Rate and Gauge. Since Rates already need an interval, the only "odd" case
// is Gauges.
//
// Ultimately if we had a unique internal representation of a Rate metric type, we wouldn't need to
// have special handling for the interval, we would just apply it to all metrics that it came in with.
let non_rate_interval = if serie.interval.is_positive() {
NonZeroU32::new(serie.interval as u32 * 1000) // incoming is seconds, convert to milliseconds
} else {
None
};

serie.resources.into_iter().for_each(|r| {
// As per https://github.com/DataDog/datadog-agent/blob/a62ac9fb13e1e5060b89e731b8355b2b20a07c5b/pkg/serializer/internal/metrics/iterable_series.go#L180-L189
// the hostname can be found in MetricSeries::resources and that is the only value stored there.
Expand Down Expand Up @@ -323,6 +350,7 @@ pub(crate) fn decode_ddseries_v2(
))
.with_tags(Some(tags.clone()))
.with_namespace(namespace)
.with_interval_ms(non_rate_interval)
})
.collect::<Vec<_>>(),
Ok(metric_payload::MetricType::Rate) => serie
Expand Down
16 changes: 15 additions & 1 deletion src/sources/datadog_agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1902,7 +1902,7 @@ async fn decode_series_endpoint_v2() {
r#type: ddmetric_proto::metric_payload::MetricType::Gauge as i32,
unit: "".to_string(),
source_type_name: "a_random_source_type_name".to_string(),
interval: 0,
interval: 10, // Dogstatsd sets Gauge interval to 10 by default
metadata: None,
},
ddmetric_proto::metric_payload::MetricSeries {
Expand Down Expand Up @@ -1982,6 +1982,13 @@ async fn decode_series_endpoint_v2() {
)
);
assert_eq!(metric.kind(), MetricKind::Absolute);
assert_eq!(
metric
.interval_ms()
.expect("should have set interval")
.get(),
10000
);
assert_eq!(*metric.value(), MetricValue::Gauge { value: 3.14 });
assert_tags(
metric,
Expand All @@ -2006,6 +2013,13 @@ async fn decode_series_endpoint_v2() {
);
assert_eq!(metric.kind(), MetricKind::Absolute);
assert_eq!(*metric.value(), MetricValue::Gauge { value: 3.1415 });
assert_eq!(
metric
.interval_ms()
.expect("should have set interval")
.get(),
10000
);
assert_tags(
metric,
metric_tags!(
Expand Down
Loading