From dea1537c082d85092e8865acca32c6d3a038c9da Mon Sep 17 00:00:00 2001 From: Noemi <45180344+unflxw@users.noreply.github.com> Date: Tue, 8 Aug 2023 16:02:27 +0200 Subject: [PATCH 1/3] feat(appsignal sink): Normalize metrics Implement a normaliser for the AppSignal sink to convert absolute counter metrics to incremental counters, and incremental gauges to absolute gauges. The AppSignal API ignores absolute counters and incremental gauges, so this change adds support for absolute counters and incremental gauges. This normaliser is inspired by the DataDog normaliser. --- src/sinks/appsignal/mod.rs | 1 + src/sinks/appsignal/normalizer.rs | 199 ++++++++++++++++++++++++++++++ src/sinks/appsignal/sink.rs | 15 ++- 3 files changed, 213 insertions(+), 2 deletions(-) create mode 100644 src/sinks/appsignal/normalizer.rs diff --git a/src/sinks/appsignal/mod.rs b/src/sinks/appsignal/mod.rs index 175c456bf6cbc..ecfa404b33dbf 100644 --- a/src/sinks/appsignal/mod.rs +++ b/src/sinks/appsignal/mod.rs @@ -8,6 +8,7 @@ mod config; mod encoder; +mod normalizer; mod request_builder; mod service; mod sink; diff --git a/src/sinks/appsignal/normalizer.rs b/src/sinks/appsignal/normalizer.rs new file mode 100644 index 0000000000000..c6d330d2f3a98 --- /dev/null +++ b/src/sinks/appsignal/normalizer.rs @@ -0,0 +1,199 @@ +use vector_core::event::{Metric, MetricValue}; + +use crate::sinks::util::buffer::metrics::{MetricNormalize, MetricSet}; + +#[derive(Default)] +pub(crate) struct AppsignalMetricsNormalizer; + +impl MetricNormalize for AppsignalMetricsNormalizer { + fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option { + // We only care about making sure that counters are incremental, and that gauges are + // always absolute. Other metric types are currently unsupported. + match &metric.value() { + // We always send counters as incremental and gauges as absolute. Realistically, any + // system sending an incremental gauge update is kind of doing it wrong, but alas. + MetricValue::Counter { .. } => state.make_incremental(metric), + MetricValue::Gauge { .. } => state.make_absolute(metric), + // Otherwise, send it through as-is. + _ => Some(metric), + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeSet; + + use vector_core::event::{Metric, MetricKind, MetricValue}; + + use super::AppsignalMetricsNormalizer; + use crate::sinks::util::buffer::metrics::{MetricNormalize, MetricSet}; + + fn get_counter(value: f64, kind: MetricKind) -> Metric { + Metric::new("counter", kind, MetricValue::Counter { value }) + } + + fn get_gauge(value: f64, kind: MetricKind) -> Metric { + Metric::new("gauge", kind, MetricValue::Gauge { value }) + } + + fn run_comparisons(inputs: Vec, expected_outputs: Vec>) { + let mut metric_set = MetricSet::default(); + let mut normalizer = AppsignalMetricsNormalizer; + + for (input, expected) in inputs.into_iter().zip(expected_outputs) { + let result = normalizer.normalize(&mut metric_set, input); + assert_eq!(result, expected); + } + } + + #[test] + fn absolute_counter() { + let first_value = 3.14; + let second_value = 8.675309; + + let counters = vec![ + get_counter(first_value, MetricKind::Absolute), + get_counter(second_value, MetricKind::Absolute), + ]; + + let expected_counters = vec![ + None, + Some(get_counter( + second_value - first_value, + MetricKind::Incremental, + )), + ]; + + run_comparisons(counters, expected_counters); + } + + #[test] + fn incremental_counter() { + let first_value = 3.14; + let second_value = 8.675309; + + let counters = vec![ + get_counter(first_value, MetricKind::Incremental), + get_counter(second_value, MetricKind::Incremental), + ]; + + let expected_counters = counters + .clone() + .into_iter() + .map(Option::Some) + .collect::>(); + + run_comparisons(counters, expected_counters); + } + + #[test] + fn mixed_counter() { + let first_value = 3.14; + let second_value = 8.675309; + let third_value = 16.19; + + let counters = vec![ + get_counter(first_value, MetricKind::Incremental), + get_counter(second_value, MetricKind::Absolute), + get_counter(third_value, MetricKind::Absolute), + get_counter(first_value, MetricKind::Absolute), + get_counter(second_value, MetricKind::Incremental), + get_counter(third_value, MetricKind::Incremental), + ]; + + let expected_counters = vec![ + Some(get_counter(first_value, MetricKind::Incremental)), + None, + Some(get_counter( + third_value - second_value, + MetricKind::Incremental, + )), + None, + Some(get_counter(second_value, MetricKind::Incremental)), + Some(get_counter(third_value, MetricKind::Incremental)), + ]; + + run_comparisons(counters, expected_counters); + } + + #[test] + fn absolute_gauge() { + let first_value = 3.14; + let second_value = 8.675309; + + let gauges = vec![ + get_gauge(first_value, MetricKind::Absolute), + get_gauge(second_value, MetricKind::Absolute), + ]; + + let expected_gauges = gauges + .clone() + .into_iter() + .map(Option::Some) + .collect::>(); + + run_comparisons(gauges, expected_gauges); + } + + #[test] + fn incremental_gauge() { + let first_value = 3.14; + let second_value = 8.675309; + + let gauges = vec![ + get_gauge(first_value, MetricKind::Incremental), + get_gauge(second_value, MetricKind::Incremental), + ]; + + let expected_gauges = vec![ + Some(get_gauge(first_value, MetricKind::Absolute)), + Some(get_gauge(first_value + second_value, MetricKind::Absolute)), + ]; + + run_comparisons(gauges, expected_gauges); + } + + #[test] + fn mixed_gauge() { + let first_value = 3.14; + let second_value = 8.675309; + let third_value = 16.19; + + let gauges = vec![ + get_gauge(first_value, MetricKind::Incremental), + get_gauge(second_value, MetricKind::Absolute), + get_gauge(third_value, MetricKind::Absolute), + get_gauge(first_value, MetricKind::Absolute), + get_gauge(second_value, MetricKind::Incremental), + get_gauge(third_value, MetricKind::Incremental), + ]; + + let expected_gauges = vec![ + Some(get_gauge(first_value, MetricKind::Absolute)), + Some(get_gauge(second_value, MetricKind::Absolute)), + Some(get_gauge(third_value, MetricKind::Absolute)), + Some(get_gauge(first_value, MetricKind::Absolute)), + Some(get_gauge(first_value + second_value, MetricKind::Absolute)), + Some(get_gauge( + first_value + second_value + third_value, + MetricKind::Absolute, + )), + ]; + + run_comparisons(gauges, expected_gauges); + } + + #[test] + fn other_metrics() { + let metric = Metric::new( + "set", + MetricKind::Incremental, + MetricValue::Set { + values: BTreeSet::new(), + }, + ); + + run_comparisons(vec![metric], vec![None]); + } +} diff --git a/src/sinks/appsignal/sink.rs b/src/sinks/appsignal/sink.rs index ab9b135829abb..b908ed757a45a 100644 --- a/src/sinks/appsignal/sink.rs +++ b/src/sinks/appsignal/sink.rs @@ -1,4 +1,5 @@ use futures::{stream::BoxStream, StreamExt}; +use futures_util::future::ready; use tower::{Service, ServiceBuilder}; use vector_core::{ event::Event, @@ -7,12 +8,14 @@ use vector_core::{ }; use crate::{ - codecs::Transformer, internal_events::SinkRequestBuildError, - sinks::util::builder::SinkBuilderExt, sinks::util::Compression, + codecs::Transformer, + internal_events::SinkRequestBuildError, + sinks::util::{buffer::metrics::MetricNormalizer, builder::SinkBuilderExt, Compression}, }; use super::{ encoder::AppsignalEncoder, + normalizer::AppsignalMetricsNormalizer, request_builder::{AppsignalRequest, AppsignalRequestBuilder}, }; @@ -32,8 +35,16 @@ where { pub(super) async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { let service = ServiceBuilder::new().service(self.service); + let mut normalizer = MetricNormalizer::::default(); input + .filter_map(move |event| { + ready(if let Event::Metric(metric) = event { + normalizer.normalize(metric).map(Event::Metric) + } else { + Some(event) + }) + }) .batched(self.batch_settings.into_byte_size_config()) .request_builder( None, From 4767d264f7a2e91177480470ea883a0a81df2ff5 Mon Sep 17 00:00:00 2001 From: Noemi <45180344+unflxw@users.noreply.github.com> Date: Fri, 18 Aug 2023 13:21:15 +0200 Subject: [PATCH 2/3] Refactor metric normalizer tests Move the methods that generate test metrics and the code that compares metric inputs and normalized outputs to the `test_util` metrics module. Previously these test helpers were duplicated across DataDog, StatsD and AppSignal's metric sinks. Rename the `run_comparisons` method to `assert_normalize`, as it asserts the results of running a normalizer's `.normalize` method. Move the duplicated test implementations to the `test_util::metrics` module, in a separate tests sub-module, and make them generic over the normalizer. Use these test definitions in the DataDog, StatsD and AppSignal's metric sink tests. --- src/sinks/appsignal/normalizer.rs | 147 +---------- src/sinks/datadog/metrics/normalizer.rs | 303 ++--------------------- src/sinks/statsd/normalizer.rs | 266 ++------------------ src/test_util/metrics.rs | 313 +++++++++++++++++++++++- 4 files changed, 363 insertions(+), 666 deletions(-) diff --git a/src/sinks/appsignal/normalizer.rs b/src/sinks/appsignal/normalizer.rs index c6d330d2f3a98..0108de6516b3f 100644 --- a/src/sinks/appsignal/normalizer.rs +++ b/src/sinks/appsignal/normalizer.rs @@ -24,164 +24,39 @@ impl MetricNormalize for AppsignalMetricsNormalizer { mod tests { use std::collections::BTreeSet; - use vector_core::event::{Metric, MetricKind, MetricValue}; + use crate::event::{Metric, MetricKind, MetricValue}; use super::AppsignalMetricsNormalizer; - use crate::sinks::util::buffer::metrics::{MetricNormalize, MetricSet}; - - fn get_counter(value: f64, kind: MetricKind) -> Metric { - Metric::new("counter", kind, MetricValue::Counter { value }) - } - - fn get_gauge(value: f64, kind: MetricKind) -> Metric { - Metric::new("gauge", kind, MetricValue::Gauge { value }) - } - - fn run_comparisons(inputs: Vec, expected_outputs: Vec>) { - let mut metric_set = MetricSet::default(); - let mut normalizer = AppsignalMetricsNormalizer; - - for (input, expected) in inputs.into_iter().zip(expected_outputs) { - let result = normalizer.normalize(&mut metric_set, input); - assert_eq!(result, expected); - } - } + use crate::test_util::metrics::{assert_normalize, tests}; #[test] fn absolute_counter() { - let first_value = 3.14; - let second_value = 8.675309; - - let counters = vec![ - get_counter(first_value, MetricKind::Absolute), - get_counter(second_value, MetricKind::Absolute), - ]; - - let expected_counters = vec![ - None, - Some(get_counter( - second_value - first_value, - MetricKind::Incremental, - )), - ]; - - run_comparisons(counters, expected_counters); + tests::absolute_counter_normalize_to_incremental(AppsignalMetricsNormalizer); } #[test] fn incremental_counter() { - let first_value = 3.14; - let second_value = 8.675309; - - let counters = vec![ - get_counter(first_value, MetricKind::Incremental), - get_counter(second_value, MetricKind::Incremental), - ]; - - let expected_counters = counters - .clone() - .into_iter() - .map(Option::Some) - .collect::>(); - - run_comparisons(counters, expected_counters); + tests::incremental_counter_normalize_to_incremental(AppsignalMetricsNormalizer); } #[test] fn mixed_counter() { - let first_value = 3.14; - let second_value = 8.675309; - let third_value = 16.19; - - let counters = vec![ - get_counter(first_value, MetricKind::Incremental), - get_counter(second_value, MetricKind::Absolute), - get_counter(third_value, MetricKind::Absolute), - get_counter(first_value, MetricKind::Absolute), - get_counter(second_value, MetricKind::Incremental), - get_counter(third_value, MetricKind::Incremental), - ]; - - let expected_counters = vec![ - Some(get_counter(first_value, MetricKind::Incremental)), - None, - Some(get_counter( - third_value - second_value, - MetricKind::Incremental, - )), - None, - Some(get_counter(second_value, MetricKind::Incremental)), - Some(get_counter(third_value, MetricKind::Incremental)), - ]; - - run_comparisons(counters, expected_counters); + tests::mixed_counter_normalize_to_incremental(AppsignalMetricsNormalizer); } #[test] fn absolute_gauge() { - let first_value = 3.14; - let second_value = 8.675309; - - let gauges = vec![ - get_gauge(first_value, MetricKind::Absolute), - get_gauge(second_value, MetricKind::Absolute), - ]; - - let expected_gauges = gauges - .clone() - .into_iter() - .map(Option::Some) - .collect::>(); - - run_comparisons(gauges, expected_gauges); + tests::absolute_gauge_normalize_to_absolute(AppsignalMetricsNormalizer); } #[test] fn incremental_gauge() { - let first_value = 3.14; - let second_value = 8.675309; - - let gauges = vec![ - get_gauge(first_value, MetricKind::Incremental), - get_gauge(second_value, MetricKind::Incremental), - ]; - - let expected_gauges = vec![ - Some(get_gauge(first_value, MetricKind::Absolute)), - Some(get_gauge(first_value + second_value, MetricKind::Absolute)), - ]; - - run_comparisons(gauges, expected_gauges); + tests::incremental_gauge_normalize_to_absolute(AppsignalMetricsNormalizer); } #[test] fn mixed_gauge() { - let first_value = 3.14; - let second_value = 8.675309; - let third_value = 16.19; - - let gauges = vec![ - get_gauge(first_value, MetricKind::Incremental), - get_gauge(second_value, MetricKind::Absolute), - get_gauge(third_value, MetricKind::Absolute), - get_gauge(first_value, MetricKind::Absolute), - get_gauge(second_value, MetricKind::Incremental), - get_gauge(third_value, MetricKind::Incremental), - ]; - - let expected_gauges = vec![ - Some(get_gauge(first_value, MetricKind::Absolute)), - Some(get_gauge(second_value, MetricKind::Absolute)), - Some(get_gauge(third_value, MetricKind::Absolute)), - Some(get_gauge(first_value, MetricKind::Absolute)), - Some(get_gauge(first_value + second_value, MetricKind::Absolute)), - Some(get_gauge( - first_value + second_value + third_value, - MetricKind::Absolute, - )), - ]; - - run_comparisons(gauges, expected_gauges); + tests::mixed_gauge_normalize_to_absolute(AppsignalMetricsNormalizer); } #[test] @@ -194,6 +69,10 @@ mod tests { }, ); - run_comparisons(vec![metric], vec![None]); + assert_normalize( + AppsignalMetricsNormalizer, + vec![metric.clone()], + vec![Some(metric)], + ); } } diff --git a/src/sinks/datadog/metrics/normalizer.rs b/src/sinks/datadog/metrics/normalizer.rs index d4e6430250e1c..6b6d27b1c0649 100644 --- a/src/sinks/datadog/metrics/normalizer.rs +++ b/src/sinks/datadog/metrics/normalizer.rs @@ -38,131 +38,16 @@ impl MetricNormalize for DatadogMetricsNormalizer { #[cfg(test)] mod tests { - use std::fmt; - use vector_core::{ - event::{ - metric::{Bucket, MetricSketch, Sample}, - Metric, MetricKind, MetricValue, StatisticKind, - }, + event::{metric::MetricSketch, Metric, MetricKind, MetricValue}, metrics::AgentDDSketch, }; use super::DatadogMetricsNormalizer; - use crate::sinks::util::buffer::metrics::{MetricNormalize, MetricSet}; - - fn buckets_from_samples(values: &[f64]) -> (Vec, f64, u64) { - // Generate buckets, and general statistics, for an input set of data. We only use this in - // tests, and so we have some semi-realistic buckets here, but mainly we use them for testing, - // not for most accurately/efficiently representing the input samples. - let bounds = &[ - 1.0, - 2.0, - 4.0, - 8.0, - 16.0, - 32.0, - 64.0, - 128.0, - 256.0, - 512.0, - 1024.0, - f64::INFINITY, - ]; - let mut buckets = bounds - .iter() - .map(|b| Bucket { - upper_limit: *b, - count: 0, - }) - .collect::>(); - - let mut sum = 0.0; - let mut count = 0; - for value in values { - for bucket in buckets.iter_mut() { - if *value <= bucket.upper_limit { - bucket.count += 1; - } - } - - sum += *value; - count += 1; - } - - (buckets, sum, count) - } - - fn generate_f64s(start: u16, end: u16) -> Vec { - assert!(start <= end); - let mut samples = Vec::new(); - for n in start..=end { - samples.push(f64::from(n)); - } - samples - } - - fn get_counter(value: f64, kind: MetricKind) -> Metric { - Metric::new("counter", kind, MetricValue::Counter { value }) - } - - fn get_gauge(value: f64, kind: MetricKind) -> Metric { - Metric::new("gauge", kind, MetricValue::Gauge { value }) - } - fn get_set(values: S, kind: MetricKind) -> Metric - where - S: IntoIterator, - V: fmt::Display, - { - Metric::new( - "set", - kind, - MetricValue::Set { - values: values.into_iter().map(|i| i.to_string()).collect(), - }, - ) - } - - fn get_distribution(samples: S, kind: MetricKind) -> Metric - where - S: IntoIterator, - V: Into, - { - Metric::new( - "distribution", - kind, - MetricValue::Distribution { - samples: samples - .into_iter() - .map(|n| Sample { - value: n.into(), - rate: 1, - }) - .collect(), - statistic: StatisticKind::Histogram, - }, - ) - } - - fn get_aggregated_histogram(samples: S, kind: MetricKind) -> Metric - where - S: IntoIterator, - V: Into, - { - let samples = samples.into_iter().map(Into::into).collect::>(); - let (buckets, sum, count) = buckets_from_samples(&samples); - - Metric::new( - "agg_histogram", - kind, - MetricValue::AggregatedHistogram { - buckets, - count, - sum, - }, - ) - } + use crate::test_util::metrics::{ + assert_normalize, generate_f64s, get_aggregated_histogram, get_distribution, tests, + }; fn get_sketch(name: N, samples: S, kind: MetricKind) -> Metric where @@ -183,199 +68,49 @@ mod tests { ) } - fn run_comparisons(inputs: Vec, expected_outputs: Vec>) { - let mut metric_set = MetricSet::default(); - let mut normalizer = DatadogMetricsNormalizer; - - for (input, expected) in inputs.into_iter().zip(expected_outputs) { - let result = normalizer.normalize(&mut metric_set, input); - assert_eq!(result, expected); - } - } - #[test] fn absolute_counter() { - let first_value = 3.14; - let second_value = 8.675309; - - let counters = vec![ - get_counter(first_value, MetricKind::Absolute), - get_counter(second_value, MetricKind::Absolute), - ]; - - let expected_counters = vec![ - None, - Some(get_counter( - second_value - first_value, - MetricKind::Incremental, - )), - ]; - - run_comparisons(counters, expected_counters); + tests::absolute_counter_normalize_to_incremental(DatadogMetricsNormalizer); } #[test] fn incremental_counter() { - let first_value = 3.14; - let second_value = 8.675309; - - let counters = vec![ - get_counter(first_value, MetricKind::Incremental), - get_counter(second_value, MetricKind::Incremental), - ]; - - let expected_counters = counters - .clone() - .into_iter() - .map(Option::Some) - .collect::>(); - - run_comparisons(counters, expected_counters); + tests::incremental_counter_normalize_to_incremental(DatadogMetricsNormalizer); } #[test] fn mixed_counter() { - let first_value = 3.14; - let second_value = 8.675309; - let third_value = 16.19; - - let counters = vec![ - get_counter(first_value, MetricKind::Incremental), - get_counter(second_value, MetricKind::Absolute), - get_counter(third_value, MetricKind::Absolute), - get_counter(first_value, MetricKind::Absolute), - get_counter(second_value, MetricKind::Incremental), - get_counter(third_value, MetricKind::Incremental), - ]; - - let expected_counters = vec![ - Some(get_counter(first_value, MetricKind::Incremental)), - None, - Some(get_counter( - third_value - second_value, - MetricKind::Incremental, - )), - None, - Some(get_counter(second_value, MetricKind::Incremental)), - Some(get_counter(third_value, MetricKind::Incremental)), - ]; - - run_comparisons(counters, expected_counters); + tests::mixed_counter_normalize_to_incremental(DatadogMetricsNormalizer); } #[test] fn absolute_gauge() { - let first_value = 3.14; - let second_value = 8.675309; - - let gauges = vec![ - get_gauge(first_value, MetricKind::Absolute), - get_gauge(second_value, MetricKind::Absolute), - ]; - - let expected_gauges = gauges - .clone() - .into_iter() - .map(Option::Some) - .collect::>(); - - run_comparisons(gauges, expected_gauges); + tests::absolute_gauge_normalize_to_absolute(DatadogMetricsNormalizer); } #[test] fn incremental_gauge() { - let first_value = 3.14; - let second_value = 8.675309; - - let gauges = vec![ - get_gauge(first_value, MetricKind::Incremental), - get_gauge(second_value, MetricKind::Incremental), - ]; - - let expected_gauges = vec![ - Some(get_gauge(first_value, MetricKind::Absolute)), - Some(get_gauge(first_value + second_value, MetricKind::Absolute)), - ]; - - run_comparisons(gauges, expected_gauges); + tests::incremental_gauge_normalize_to_absolute(DatadogMetricsNormalizer); } #[test] fn mixed_gauge() { - let first_value = 3.14; - let second_value = 8.675309; - let third_value = 16.19; - - let gauges = vec![ - get_gauge(first_value, MetricKind::Incremental), - get_gauge(second_value, MetricKind::Absolute), - get_gauge(third_value, MetricKind::Absolute), - get_gauge(first_value, MetricKind::Absolute), - get_gauge(second_value, MetricKind::Incremental), - get_gauge(third_value, MetricKind::Incremental), - ]; - - let expected_gauges = vec![ - Some(get_gauge(first_value, MetricKind::Absolute)), - Some(get_gauge(second_value, MetricKind::Absolute)), - Some(get_gauge(third_value, MetricKind::Absolute)), - Some(get_gauge(first_value, MetricKind::Absolute)), - Some(get_gauge(first_value + second_value, MetricKind::Absolute)), - Some(get_gauge( - first_value + second_value + third_value, - MetricKind::Absolute, - )), - ]; - - run_comparisons(gauges, expected_gauges); + tests::mixed_gauge_normalize_to_absolute(DatadogMetricsNormalizer); } #[test] fn absolute_set() { - let sets = vec![ - get_set(1..=20, MetricKind::Absolute), - get_set(15..=25, MetricKind::Absolute), - ]; - - let expected_sets = vec![None, Some(get_set(21..=25, MetricKind::Incremental))]; - - run_comparisons(sets, expected_sets); + tests::absolute_set_normalize_to_incremental(DatadogMetricsNormalizer); } #[test] fn incremental_set() { - let sets = vec![ - get_set(1..=20, MetricKind::Incremental), - get_set(15..=25, MetricKind::Incremental), - ]; - - let expected_sets = vec![ - Some(get_set(1..=20, MetricKind::Incremental)), - Some(get_set(15..=25, MetricKind::Incremental)), - ]; - - run_comparisons(sets, expected_sets); + tests::incremental_set_normalize_to_incremental(DatadogMetricsNormalizer); } #[test] fn mixed_set() { - let sets = vec![ - get_set(1..=20, MetricKind::Incremental), - get_set(10..=16, MetricKind::Absolute), - get_set(15..=25, MetricKind::Absolute), - get_set(1..5, MetricKind::Incremental), - get_set(3..=42, MetricKind::Incremental), - ]; - - let expected_sets = vec![ - Some(get_set(1..=20, MetricKind::Incremental)), - None, - Some(get_set(17..=25, MetricKind::Incremental)), - Some(get_set(1..5, MetricKind::Incremental)), - Some(get_set(3..=42, MetricKind::Incremental)), - ]; - - run_comparisons(sets, expected_sets); + tests::mixed_set_normalize_to_incremental(DatadogMetricsNormalizer); } #[test] @@ -401,7 +136,7 @@ mod tests { )), ]; - run_comparisons(distributions, expected_sketches); + assert_normalize(DatadogMetricsNormalizer, distributions, expected_sketches); } #[test] @@ -429,7 +164,7 @@ mod tests { )), ]; - run_comparisons(distributions, expected_sketches); + assert_normalize(DatadogMetricsNormalizer, distributions, expected_sketches); } #[test] @@ -476,7 +211,7 @@ mod tests { )), ]; - run_comparisons(distributions, expected_sketches); + assert_normalize(DatadogMetricsNormalizer, distributions, expected_sketches); } #[test] @@ -501,7 +236,7 @@ mod tests { ), ]; - run_comparisons(agg_histograms, expected_sketches); + assert_normalize(DatadogMetricsNormalizer, agg_histograms, expected_sketches); } #[test] @@ -533,7 +268,7 @@ mod tests { ), ]; - run_comparisons(agg_histograms, expected_sketches); + assert_normalize(DatadogMetricsNormalizer, agg_histograms, expected_sketches); } #[test] @@ -588,6 +323,6 @@ mod tests { ), ]; - run_comparisons(agg_histograms, expected_sketches); + assert_normalize(DatadogMetricsNormalizer, agg_histograms, expected_sketches); } } diff --git a/src/sinks/statsd/normalizer.rs b/src/sinks/statsd/normalizer.rs index e497d074dfb16..2592292e7023c 100644 --- a/src/sinks/statsd/normalizer.rs +++ b/src/sinks/statsd/normalizer.rs @@ -20,226 +20,33 @@ impl MetricNormalize for StatsdNormalizer { #[cfg(test)] mod tests { - use std::fmt; - - use vector_core::event::{ - metric::{Bucket, Sample}, - Metric, MetricKind, MetricValue, StatisticKind, - }; + use vector_core::event::MetricKind; use super::StatsdNormalizer; - use crate::sinks::util::buffer::metrics::{MetricNormalize, MetricSet}; - - fn buckets_from_samples(values: &[f64]) -> (Vec, f64, u64) { - // Generate buckets, and general statistics, for an input set of data. We only use this in - // tests, and so we have some semi-realistic buckets here, but mainly we use them for testing, - // not for most accurately/efficiently representing the input samples. - let bounds = &[ - 1.0, - 2.0, - 4.0, - 8.0, - 16.0, - 32.0, - 64.0, - 128.0, - 256.0, - 512.0, - 1024.0, - f64::INFINITY, - ]; - let mut buckets = bounds - .iter() - .map(|b| Bucket { - upper_limit: *b, - count: 0, - }) - .collect::>(); - - let mut sum = 0.0; - let mut count = 0; - for value in values { - for bucket in buckets.iter_mut() { - if *value <= bucket.upper_limit { - bucket.count += 1; - } - } - - sum += *value; - count += 1; - } - - (buckets, sum, count) - } - - fn generate_f64s(start: u16, end: u16) -> Vec { - assert!(start <= end); - let mut samples = Vec::new(); - for n in start..=end { - samples.push(f64::from(n)); - } - samples - } - - fn get_counter(value: f64, kind: MetricKind) -> Metric { - Metric::new("counter", kind, MetricValue::Counter { value }) - } - - fn get_gauge(value: f64, kind: MetricKind) -> Metric { - Metric::new("gauge", kind, MetricValue::Gauge { value }) - } - - fn get_set(values: S, kind: MetricKind) -> Metric - where - S: IntoIterator, - V: fmt::Display, - { - Metric::new( - "set", - kind, - MetricValue::Set { - values: values.into_iter().map(|i| i.to_string()).collect(), - }, - ) - } - - fn get_distribution(samples: S, kind: MetricKind) -> Metric - where - S: IntoIterator, - V: Into, - { - Metric::new( - "distribution", - kind, - MetricValue::Distribution { - samples: samples - .into_iter() - .map(|n| Sample { - value: n.into(), - rate: 1, - }) - .collect(), - statistic: StatisticKind::Histogram, - }, - ) - } - - fn get_aggregated_histogram(samples: S, kind: MetricKind) -> Metric - where - S: IntoIterator, - V: Into, - { - let samples = samples.into_iter().map(Into::into).collect::>(); - let (buckets, sum, count) = buckets_from_samples(&samples); - - Metric::new( - "agg_histogram", - kind, - MetricValue::AggregatedHistogram { - buckets, - count, - sum, - }, - ) - } - fn run_comparisons(inputs: Vec, expected_outputs: Vec>) { - let mut metric_set = MetricSet::default(); - let mut normalizer = StatsdNormalizer; - - for (input, expected) in inputs.into_iter().zip(expected_outputs) { - let result = normalizer.normalize(&mut metric_set, input); - assert_eq!(result, expected); - } - } + use crate::test_util::metrics::{ + assert_normalize, generate_f64s, get_aggregated_histogram, get_distribution, get_gauge, + tests, + }; #[test] fn absolute_counter() { - let first_value = 3.14; - let second_value = 8.675309; - - let counters = vec![ - get_counter(first_value, MetricKind::Absolute), - get_counter(second_value, MetricKind::Absolute), - ]; - - let expected_counters = vec![ - None, - Some(get_counter( - second_value - first_value, - MetricKind::Incremental, - )), - ]; - - run_comparisons(counters, expected_counters); + tests::absolute_counter_normalize_to_incremental(StatsdNormalizer); } #[test] fn incremental_counter() { - let first_value = 3.14; - let second_value = 8.675309; - - let counters = vec![ - get_counter(first_value, MetricKind::Incremental), - get_counter(second_value, MetricKind::Incremental), - ]; - - let expected_counters = counters - .clone() - .into_iter() - .map(Option::Some) - .collect::>(); - - run_comparisons(counters, expected_counters); + tests::incremental_counter_normalize_to_incremental(StatsdNormalizer); } #[test] fn mixed_counter() { - let first_value = 3.14; - let second_value = 8.675309; - let third_value = 16.19; - - let counters = vec![ - get_counter(first_value, MetricKind::Incremental), - get_counter(second_value, MetricKind::Absolute), - get_counter(third_value, MetricKind::Absolute), - get_counter(first_value, MetricKind::Absolute), - get_counter(second_value, MetricKind::Incremental), - get_counter(third_value, MetricKind::Incremental), - ]; - - let expected_counters = vec![ - Some(get_counter(first_value, MetricKind::Incremental)), - None, - Some(get_counter( - third_value - second_value, - MetricKind::Incremental, - )), - None, - Some(get_counter(second_value, MetricKind::Incremental)), - Some(get_counter(third_value, MetricKind::Incremental)), - ]; - - run_comparisons(counters, expected_counters); + tests::mixed_counter_normalize_to_incremental(StatsdNormalizer); } #[test] fn absolute_gauge() { - let first_value = 3.14; - let second_value = 8.675309; - - let gauges = vec![ - get_gauge(first_value, MetricKind::Absolute), - get_gauge(second_value, MetricKind::Absolute), - ]; - - let expected_gauges = gauges - .clone() - .into_iter() - .map(Option::Some) - .collect::>(); - - run_comparisons(gauges, expected_gauges); + tests::absolute_gauge_normalize_to_absolute(StatsdNormalizer); } #[test] @@ -258,7 +65,7 @@ mod tests { .map(Option::Some) .collect::>(); - run_comparisons(gauges, expected_gauges); + assert_normalize(StatsdNormalizer, gauges, expected_gauges); } #[test] @@ -282,55 +89,22 @@ mod tests { .map(Option::Some) .collect::>(); - run_comparisons(gauges, expected_gauges); + assert_normalize(StatsdNormalizer, gauges, expected_gauges); } #[test] fn absolute_set() { - let sets = vec![ - get_set(1..=20, MetricKind::Absolute), - get_set(15..=25, MetricKind::Absolute), - ]; - - let expected_sets = vec![None, Some(get_set(21..=25, MetricKind::Incremental))]; - - run_comparisons(sets, expected_sets); + tests::absolute_set_normalize_to_incremental(StatsdNormalizer); } #[test] fn incremental_set() { - let sets = vec![ - get_set(1..=20, MetricKind::Incremental), - get_set(15..=25, MetricKind::Incremental), - ]; - - let expected_sets = vec![ - Some(get_set(1..=20, MetricKind::Incremental)), - Some(get_set(15..=25, MetricKind::Incremental)), - ]; - - run_comparisons(sets, expected_sets); + tests::incremental_set_normalize_to_incremental(StatsdNormalizer); } #[test] fn mixed_set() { - let sets = vec![ - get_set(1..=20, MetricKind::Incremental), - get_set(10..=16, MetricKind::Absolute), - get_set(15..=25, MetricKind::Absolute), - get_set(1..5, MetricKind::Incremental), - get_set(3..=42, MetricKind::Incremental), - ]; - - let expected_sets = vec![ - Some(get_set(1..=20, MetricKind::Incremental)), - None, - Some(get_set(17..=25, MetricKind::Incremental)), - Some(get_set(1..5, MetricKind::Incremental)), - Some(get_set(3..=42, MetricKind::Incremental)), - ]; - - run_comparisons(sets, expected_sets); + tests::mixed_set_normalize_to_incremental(StatsdNormalizer); } #[test] @@ -349,7 +123,7 @@ mod tests { Some(get_distribution(expected_samples, MetricKind::Incremental)), ]; - run_comparisons(distributions, expected_distributions); + assert_normalize(StatsdNormalizer, distributions, expected_distributions); } #[test] @@ -364,7 +138,7 @@ mod tests { let expected_distributions = distributions.iter().cloned().map(Some).collect(); - run_comparisons(distributions, expected_distributions); + assert_normalize(StatsdNormalizer, distributions, expected_distributions); } #[test] @@ -394,7 +168,7 @@ mod tests { Some(distributions[4].clone()), ]; - run_comparisons(distributions, expected_distributions); + assert_normalize(StatsdNormalizer, distributions, expected_distributions); } #[test] @@ -409,7 +183,7 @@ mod tests { let expected_agg_histograms = vec![]; - run_comparisons(agg_histograms, expected_agg_histograms); + assert_normalize(StatsdNormalizer, agg_histograms, expected_agg_histograms); } #[test] @@ -428,7 +202,7 @@ mod tests { .map(Option::Some) .collect::>(); - run_comparisons(agg_histograms, expected_agg_histograms); + assert_normalize(StatsdNormalizer, agg_histograms, expected_agg_histograms); } #[test] @@ -449,6 +223,6 @@ mod tests { let expected_agg_histograms = vec![]; - run_comparisons(agg_histograms, expected_agg_histograms); + assert_normalize(StatsdNormalizer, agg_histograms, expected_agg_histograms); } } diff --git a/src/test_util/metrics.rs b/src/test_util/metrics.rs index 9a3bdca726b71..749df9d736ce9 100644 --- a/src/test_util/metrics.rs +++ b/src/test_util/metrics.rs @@ -1,10 +1,13 @@ use std::collections::{HashMap, HashSet}; +use std::fmt::Display; + use vector_core::event::{ - metric::{MetricData, MetricSeries, Sample}, - Event, EventMetadata, Metric, MetricValue, + metric::{Bucket, MetricData, MetricSeries, Sample}, + Event, EventMetadata, Metric, MetricValue, StatisticKind, }; +use crate::event::MetricKind; use crate::sinks::util::buffer::metrics::{MetricNormalize, MetricSet}; type SplitMetrics = HashMap; @@ -246,3 +249,309 @@ pub fn assert_set(metrics: &SplitMetrics, series: MetricSeries, expected_values: assert_eq!(actual_values, expected_values); } + +fn buckets_from_samples(values: &[f64]) -> (Vec, f64, u64) { + // Generate buckets, and general statistics, for an input set of data. We only use this in + // tests, and so we have some semi-realistic buckets here, but mainly we use them for testing, + // not for most accurately/efficiently representing the input samples. + let bounds = &[ + 1.0, + 2.0, + 4.0, + 8.0, + 16.0, + 32.0, + 64.0, + 128.0, + 256.0, + 512.0, + 1024.0, + f64::INFINITY, + ]; + let mut buckets = bounds + .iter() + .map(|b| Bucket { + upper_limit: *b, + count: 0, + }) + .collect::>(); + + let mut sum = 0.0; + let mut count = 0; + for value in values { + for bucket in buckets.iter_mut() { + if *value <= bucket.upper_limit { + bucket.count += 1; + } + } + + sum += *value; + count += 1; + } + + (buckets, sum, count) +} + +pub fn generate_f64s(start: u16, end: u16) -> Vec { + assert!(start <= end); + let mut samples = Vec::new(); + for n in start..=end { + samples.push(f64::from(n)); + } + samples +} + +pub fn get_set(values: S, kind: MetricKind) -> Metric +where + S: IntoIterator, + V: Display, +{ + Metric::new( + "set", + kind, + MetricValue::Set { + values: values.into_iter().map(|i| i.to_string()).collect(), + }, + ) +} + +pub fn get_distribution(samples: S, kind: MetricKind) -> Metric +where + S: IntoIterator, + V: Into, +{ + Metric::new( + "distribution", + kind, + MetricValue::Distribution { + samples: samples + .into_iter() + .map(|n| Sample { + value: n.into(), + rate: 1, + }) + .collect(), + statistic: StatisticKind::Histogram, + }, + ) +} + +pub fn get_aggregated_histogram(samples: S, kind: MetricKind) -> Metric +where + S: IntoIterator, + V: Into, +{ + let samples = samples.into_iter().map(Into::into).collect::>(); + let (buckets, sum, count) = buckets_from_samples(&samples); + + Metric::new( + "agg_histogram", + kind, + MetricValue::AggregatedHistogram { + buckets, + count, + sum, + }, + ) +} + +pub fn get_counter(value: f64, kind: MetricKind) -> Metric { + Metric::new("counter", kind, MetricValue::Counter { value }) +} + +pub fn get_gauge(value: f64, kind: MetricKind) -> Metric { + Metric::new("gauge", kind, MetricValue::Gauge { value }) +} + +pub fn assert_normalize( + mut normalizer: N, + inputs: Vec, + expected_outputs: Vec>, +) { + let mut metric_set = MetricSet::default(); + + for (input, expected) in inputs.into_iter().zip(expected_outputs) { + let result = normalizer.normalize(&mut metric_set, input); + assert_eq!(result, expected); + } +} + +pub mod tests { + use super::*; + + pub fn absolute_counter_normalize_to_incremental(normalizer: N) { + let first_value = 3.14; + let second_value = 8.675309; + + let counters = vec![ + get_counter(first_value, MetricKind::Absolute), + get_counter(second_value, MetricKind::Absolute), + ]; + + let expected_counters = vec![ + None, + Some(get_counter( + second_value - first_value, + MetricKind::Incremental, + )), + ]; + + assert_normalize(normalizer, counters, expected_counters); + } + + pub fn incremental_counter_normalize_to_incremental(normalizer: N) { + let first_value = 3.14; + let second_value = 8.675309; + + let counters = vec![ + get_counter(first_value, MetricKind::Incremental), + get_counter(second_value, MetricKind::Incremental), + ]; + + let expected_counters = counters + .clone() + .into_iter() + .map(Option::Some) + .collect::>(); + + assert_normalize(normalizer, counters, expected_counters); + } + + pub fn mixed_counter_normalize_to_incremental(normalizer: N) { + let first_value = 3.14; + let second_value = 8.675309; + let third_value = 16.19; + + let counters = vec![ + get_counter(first_value, MetricKind::Incremental), + get_counter(second_value, MetricKind::Absolute), + get_counter(third_value, MetricKind::Absolute), + get_counter(first_value, MetricKind::Absolute), + get_counter(second_value, MetricKind::Incremental), + get_counter(third_value, MetricKind::Incremental), + ]; + + let expected_counters = vec![ + Some(get_counter(first_value, MetricKind::Incremental)), + None, + Some(get_counter( + third_value - second_value, + MetricKind::Incremental, + )), + None, + Some(get_counter(second_value, MetricKind::Incremental)), + Some(get_counter(third_value, MetricKind::Incremental)), + ]; + + assert_normalize(normalizer, counters, expected_counters); + } + + pub fn absolute_gauge_normalize_to_absolute(normalizer: N) { + let first_value = 3.14; + let second_value = 8.675309; + + let gauges = vec![ + get_gauge(first_value, MetricKind::Absolute), + get_gauge(second_value, MetricKind::Absolute), + ]; + + let expected_gauges = gauges + .clone() + .into_iter() + .map(Option::Some) + .collect::>(); + + assert_normalize(normalizer, gauges, expected_gauges); + } + + pub fn incremental_gauge_normalize_to_absolute(normalizer: N) { + let first_value = 3.14; + let second_value = 8.675309; + + let gauges = vec![ + get_gauge(first_value, MetricKind::Incremental), + get_gauge(second_value, MetricKind::Incremental), + ]; + + let expected_gauges = vec![ + Some(get_gauge(first_value, MetricKind::Absolute)), + Some(get_gauge(first_value + second_value, MetricKind::Absolute)), + ]; + + assert_normalize(normalizer, gauges, expected_gauges); + } + + pub fn mixed_gauge_normalize_to_absolute(normalizer: N) { + let first_value = 3.14; + let second_value = 8.675309; + let third_value = 16.19; + + let gauges = vec![ + get_gauge(first_value, MetricKind::Incremental), + get_gauge(second_value, MetricKind::Absolute), + get_gauge(third_value, MetricKind::Absolute), + get_gauge(first_value, MetricKind::Absolute), + get_gauge(second_value, MetricKind::Incremental), + get_gauge(third_value, MetricKind::Incremental), + ]; + + let expected_gauges = vec![ + Some(get_gauge(first_value, MetricKind::Absolute)), + Some(get_gauge(second_value, MetricKind::Absolute)), + Some(get_gauge(third_value, MetricKind::Absolute)), + Some(get_gauge(first_value, MetricKind::Absolute)), + Some(get_gauge(first_value + second_value, MetricKind::Absolute)), + Some(get_gauge( + first_value + second_value + third_value, + MetricKind::Absolute, + )), + ]; + + assert_normalize(normalizer, gauges, expected_gauges); + } + + pub fn absolute_set_normalize_to_incremental(normalizer: N) { + let sets = vec![ + get_set(1..=20, MetricKind::Absolute), + get_set(15..=25, MetricKind::Absolute), + ]; + + let expected_sets = vec![None, Some(get_set(21..=25, MetricKind::Incremental))]; + + assert_normalize(normalizer, sets, expected_sets); + } + + pub fn incremental_set_normalize_to_incremental(normalizer: N) { + let sets = vec![ + get_set(1..=20, MetricKind::Incremental), + get_set(15..=25, MetricKind::Incremental), + ]; + + let expected_sets = vec![ + Some(get_set(1..=20, MetricKind::Incremental)), + Some(get_set(15..=25, MetricKind::Incremental)), + ]; + + assert_normalize(normalizer, sets, expected_sets); + } + + pub fn mixed_set_normalize_to_incremental(normalizer: N) { + let sets = vec![ + get_set(1..=20, MetricKind::Incremental), + get_set(10..=16, MetricKind::Absolute), + get_set(15..=25, MetricKind::Absolute), + get_set(1..5, MetricKind::Incremental), + get_set(3..=42, MetricKind::Incremental), + ]; + + let expected_sets = vec![ + Some(get_set(1..=20, MetricKind::Incremental)), + None, + Some(get_set(17..=25, MetricKind::Incremental)), + Some(get_set(1..5, MetricKind::Incremental)), + Some(get_set(3..=42, MetricKind::Incremental)), + ]; + + assert_normalize(normalizer, sets, expected_sets); + } +} From a23f0463b4d089816961f8121b3133990c06d7af Mon Sep 17 00:00:00 2001 From: Noemi <45180344+unflxw@users.noreply.github.com> Date: Mon, 21 Aug 2023 14:02:48 +0200 Subject: [PATCH 3/3] Fix integration tests Since the AppSignal sink now normalises counters from absolute to incremental, absolute counters that are only emitted once do not result in an outgoing HTTP request being emitted by the sink. Address this by emitting the absolute counters in the tests at least twice. This also implicitly tests the metrics' normalisation. --- src/sinks/appsignal/integration_tests.rs | 52 ++++++++++++++++-------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/src/sinks/appsignal/integration_tests.rs b/src/sinks/appsignal/integration_tests.rs index 1c28032caf40d..8054da4eb92cc 100644 --- a/src/sinks/appsignal/integration_tests.rs +++ b/src/sinks/appsignal/integration_tests.rs @@ -98,14 +98,23 @@ async fn metrics_real_endpoint() { #[tokio::test] async fn metrics_shape() { let events: Vec<_> = (0..5) - .map(|index| { - Event::Metric(Metric::new( - format!("counter_{}", index), - MetricKind::Absolute, - MetricValue::Counter { - value: index as f64, - }, - )) + .flat_map(|index| { + vec![ + Event::Metric(Metric::new( + format!("counter_{}", index), + MetricKind::Absolute, + MetricValue::Counter { + value: index as f64, + }, + )), + Event::Metric(Metric::new( + format!("counter_{}", index), + MetricKind::Absolute, + MetricValue::Counter { + value: (index + index) as f64, + }, + )), + ] }) .collect(); let api_key = push_api_key(); @@ -146,11 +155,11 @@ async fn metrics_shape() { .collect(); assert_eq!( vec![ - ("counter_0", "absolute", 0.0), - ("counter_1", "absolute", 1.0), - ("counter_2", "absolute", 2.0), - ("counter_3", "absolute", 3.0), - ("counter_4", "absolute", 4.0), + ("counter_0", "incremental", 0.0), + ("counter_1", "incremental", 1.0), + ("counter_2", "incremental", 2.0), + ("counter_3", "incremental", 3.0), + ("counter_4", "incremental", 4.0), ], metrics ); @@ -231,11 +240,18 @@ async fn error_scenario_real_endpoint() { let (sink, _) = config.build(cx).await.unwrap(); let (batch, receiver) = BatchNotifier::new_with_receiver(); - let events = vec![Event::Metric(Metric::new( - "counter", - MetricKind::Absolute, - MetricValue::Counter { value: 1.0 }, - ))]; + let events = vec![ + Event::Metric(Metric::new( + "counter", + MetricKind::Absolute, + MetricValue::Counter { value: 1.0 }, + )), + Event::Metric(Metric::new( + "counter", + MetricKind::Absolute, + MetricValue::Counter { value: 2.0 }, + )), + ]; let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch)); sink.run(stream).await.unwrap();