Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(appsignal sink): Normalize metrics #18217

Merged
merged 3 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/sinks/appsignal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

mod config;
mod encoder;
mod normalizer;
mod request_builder;
mod service;
mod sink;
Expand Down
199 changes: 199 additions & 0 deletions src/sinks/appsignal/normalizer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
use vector_core::event::{Metric, MetricValue};

use crate::sinks::util::buffer::metrics::{MetricNormalize, MetricSet};

#[derive(Default)]
pub(crate) struct AppsignalMetricsNormalizer;

impl MetricNormalize for AppsignalMetricsNormalizer {
fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric> {
// We only care about making sure that counters are incremental, and that gauges are
// always absolute. Other metric types are currently unsupported.
match &metric.value() {
// We always send counters as incremental and gauges as absolute. Realistically, any
// system sending an incremental gauge update is kind of doing it wrong, but alas.
MetricValue::Counter { .. } => state.make_incremental(metric),
MetricValue::Gauge { .. } => state.make_absolute(metric),
// Otherwise, send it through as-is.
_ => Some(metric),
}
}
}

#[cfg(test)]
mod tests {
use std::collections::BTreeSet;

use vector_core::event::{Metric, MetricKind, MetricValue};

use super::AppsignalMetricsNormalizer;
use crate::sinks::util::buffer::metrics::{MetricNormalize, MetricSet};

fn get_counter(value: f64, kind: MetricKind) -> Metric {
Metric::new("counter", kind, MetricValue::Counter { value })
}

fn get_gauge(value: f64, kind: MetricKind) -> Metric {
Metric::new("gauge", kind, MetricValue::Gauge { value })
}

fn run_comparisons(inputs: Vec<Metric>, expected_outputs: Vec<Option<Metric>>) {
let mut metric_set = MetricSet::default();
let mut normalizer = AppsignalMetricsNormalizer;

for (input, expected) in inputs.into_iter().zip(expected_outputs) {
let result = normalizer.normalize(&mut metric_set, input);
assert_eq!(result, expected);
}
}
unflxw marked this conversation as resolved.
Show resolved Hide resolved

#[test]
fn absolute_counter() {
let first_value = 3.14;
let second_value = 8.675309;

let counters = vec![
get_counter(first_value, MetricKind::Absolute),
get_counter(second_value, MetricKind::Absolute),
];

let expected_counters = vec![
None,
Some(get_counter(
second_value - first_value,
MetricKind::Incremental,
)),
];

run_comparisons(counters, expected_counters);
}

#[test]
fn incremental_counter() {
let first_value = 3.14;
let second_value = 8.675309;

let counters = vec![
get_counter(first_value, MetricKind::Incremental),
get_counter(second_value, MetricKind::Incremental),
];

let expected_counters = counters
.clone()
.into_iter()
.map(Option::Some)
.collect::<Vec<_>>();

run_comparisons(counters, expected_counters);
}

#[test]
fn mixed_counter() {
let first_value = 3.14;
let second_value = 8.675309;
let third_value = 16.19;

let counters = vec![
get_counter(first_value, MetricKind::Incremental),
get_counter(second_value, MetricKind::Absolute),
get_counter(third_value, MetricKind::Absolute),
get_counter(first_value, MetricKind::Absolute),
get_counter(second_value, MetricKind::Incremental),
get_counter(third_value, MetricKind::Incremental),
];

let expected_counters = vec![
Some(get_counter(first_value, MetricKind::Incremental)),
None,
Some(get_counter(
third_value - second_value,
MetricKind::Incremental,
)),
None,
Some(get_counter(second_value, MetricKind::Incremental)),
Some(get_counter(third_value, MetricKind::Incremental)),
];

run_comparisons(counters, expected_counters);
}

#[test]
fn absolute_gauge() {
let first_value = 3.14;
let second_value = 8.675309;

let gauges = vec![
get_gauge(first_value, MetricKind::Absolute),
get_gauge(second_value, MetricKind::Absolute),
];

let expected_gauges = gauges
.clone()
.into_iter()
.map(Option::Some)
.collect::<Vec<_>>();

run_comparisons(gauges, expected_gauges);
}

#[test]
fn incremental_gauge() {
let first_value = 3.14;
let second_value = 8.675309;

let gauges = vec![
get_gauge(first_value, MetricKind::Incremental),
get_gauge(second_value, MetricKind::Incremental),
];

let expected_gauges = vec![
Some(get_gauge(first_value, MetricKind::Absolute)),
Some(get_gauge(first_value + second_value, MetricKind::Absolute)),
];

run_comparisons(gauges, expected_gauges);
}

#[test]
fn mixed_gauge() {
let first_value = 3.14;
let second_value = 8.675309;
let third_value = 16.19;

let gauges = vec![
get_gauge(first_value, MetricKind::Incremental),
get_gauge(second_value, MetricKind::Absolute),
get_gauge(third_value, MetricKind::Absolute),
get_gauge(first_value, MetricKind::Absolute),
get_gauge(second_value, MetricKind::Incremental),
get_gauge(third_value, MetricKind::Incremental),
];

let expected_gauges = vec![
Some(get_gauge(first_value, MetricKind::Absolute)),
Some(get_gauge(second_value, MetricKind::Absolute)),
Some(get_gauge(third_value, MetricKind::Absolute)),
Some(get_gauge(first_value, MetricKind::Absolute)),
Some(get_gauge(first_value + second_value, MetricKind::Absolute)),
Some(get_gauge(
first_value + second_value + third_value,
MetricKind::Absolute,
)),
];

run_comparisons(gauges, expected_gauges);
}

#[test]
fn other_metrics() {
let metric = Metric::new(
"set",
MetricKind::Incremental,
MetricValue::Set {
values: BTreeSet::new(),
},
);

run_comparisons(vec![metric], vec![None]);
}
}
15 changes: 13 additions & 2 deletions src/sinks/appsignal/sink.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use futures::{stream::BoxStream, StreamExt};
use futures_util::future::ready;
use tower::{Service, ServiceBuilder};
use vector_core::{
event::Event,
Expand All @@ -7,12 +8,14 @@ use vector_core::{
};

use crate::{
codecs::Transformer, internal_events::SinkRequestBuildError,
sinks::util::builder::SinkBuilderExt, sinks::util::Compression,
codecs::Transformer,
internal_events::SinkRequestBuildError,
sinks::util::{buffer::metrics::MetricNormalizer, builder::SinkBuilderExt, Compression},
};

use super::{
encoder::AppsignalEncoder,
normalizer::AppsignalMetricsNormalizer,
request_builder::{AppsignalRequest, AppsignalRequestBuilder},
};

Expand All @@ -32,8 +35,16 @@ where
{
pub(super) async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
let service = ServiceBuilder::new().service(self.service);
let mut normalizer = MetricNormalizer::<AppsignalMetricsNormalizer>::default();

input
.filter_map(move |event| {
ready(if let Event::Metric(metric) = event {
normalizer.normalize(metric).map(Event::Metric)
} else {
Some(event)
})
})
.batched(self.batch_settings.into_byte_size_config())
.request_builder(
None,
Expand Down