From d6edd128fae52d591f594db28d46cb6833a5364c Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Tue, 3 Oct 2023 17:15:19 -0400 Subject: [PATCH 1/7] fix(datadog_metrics sink): improve aggregation performance --- .../vector/vector.toml | 2 +- src/sinks/datadog/metrics/sink.rs | 233 +++++++++--------- src/sinks/util/buffer/metrics/mod.rs | 2 - src/sinks/util/buffer/metrics/sort.rs | 67 ----- 4 files changed, 111 insertions(+), 193 deletions(-) delete mode 100644 src/sinks/util/buffer/metrics/sort.rs diff --git a/regression/cases/syslog_regex_logs2metric_ddmetrics/vector/vector.toml b/regression/cases/syslog_regex_logs2metric_ddmetrics/vector/vector.toml index b8a0cb4fbf558..b04171757296c 100644 --- a/regression/cases/syslog_regex_logs2metric_ddmetrics/vector/vector.toml +++ b/regression/cases/syslog_regex_logs2metric_ddmetrics/vector/vector.toml @@ -27,7 +27,7 @@ type = "log_to_metric" inputs = ["remap"] [[transforms.log2metric.metrics]] - type = "gauge" + type = "counter" field = "procid" tags.hostname = "{{ hostname }}" tags.facility = "{{ facility }}" diff --git a/src/sinks/datadog/metrics/sink.rs b/src/sinks/datadog/metrics/sink.rs index 5ceefc3c487d2..80d31a14afe4a 100644 --- a/src/sinks/datadog/metrics/sink.rs +++ b/src/sinks/datadog/metrics/sink.rs @@ -8,7 +8,6 @@ use futures_util::{ StreamExt, }; use tower::Service; -use vector_common::finalization::EventFinalizers; use vector_core::{ event::{Event, Metric, MetricValue}, partition::Partitioner, @@ -23,7 +22,6 @@ use super::{ use crate::{ internal_events::DatadogMetricsEncodingError, sinks::util::{ - buffer::metrics::sort::sort_for_compression, buffer::metrics::{AggregatedSummarySplitter, MetricSplitter}, SinkBuilderExt, }, @@ -103,15 +101,12 @@ where // Aggregate counters with identical timestamps, otherwise identical counters (same // series and same timestamp, when rounded to whole seconds) will be dropped in a // last-write-wins situation when they hit the DD metrics intake. + // + // This also sorts metrics by name, which significantly improves HTTP compression. .map(|((api_key, endpoint), metrics)| { - let collapsed_metrics = collapse_counters_by_series_and_timestamp(metrics); + let collapsed_metrics = sort_and_collapse_counters_by_series_and_timestamp(metrics); ((api_key, endpoint), collapsed_metrics) }) - // Sort metrics by name, which significantly improves HTTP compression. - .map(|((api_key, endpoint), mut metrics)| { - sort_for_compression(&mut metrics); - ((api_key, endpoint), metrics) - }) // We build our requests "incrementally", which means that for a single batch of metrics, we might generate // N requests to send them all, as Datadog has API-level limits on payload size, so we keep adding metrics // to a request until we reach the limit, and then create a new request, and so on and so forth, until all @@ -159,131 +154,72 @@ where } } -fn collapse_counters_by_series_and_timestamp(mut metrics: Vec) -> Vec { - // NOTE: Astute observers may recognize that this behavior could also be achieved by using - // `Vec::dedup_by`, but the clincher is that `dedup_by` requires a sorted vector to begin with. - // - // This function is designed to collapse duplicate counters even if the metrics are unsorted, - // which leads to a measurable boost in performance, being nearly 35% faster than `dedup_by` - // when the inputs are sorted, and up to 50% faster when the inputs are unsorted. - // - // These numbers are based on sorting a newtype wrapper around the metric instead of the metric - // itself, which does involve allocating a string in our tests. _However_, sorting the `Metric` - // directly is not possible without a customized `PartialOrd` implementation, as some of the - // nested fields containing `f64` values makes it underivable, and I'm not 100% sure that we - // could/would want to have a narrowly-focused impl of `PartialOrd` on `Metric` to fit this use - // case (metric type -> metric name -> metric timestamp, nothing else) vs being able to sort - // metrics by name first, etc. Then there's the potential issue of the reordering of fields - // changing the ordering behavior of `Metric`... and it just felt easier to write this tailored - // algorithm for the use case at hand. - let mut idx = 0; +/// Collapses counters by series and timestamp, leaving all other metrics unmodified. +/// The return value is sorted by metric series, which is desirable for compression. A sorted vector +/// tends to compress better than a random ordering by 2-3x (JSON encoded, deflate algorithm). +/// +/// Note that the time complexity of this function is O(nlogn) and the space complexity is O(1). +/// If needed, we can trade space for time by using a HashMap, which would be O(n) time and O(n) space. +fn sort_and_collapse_counters_by_series_and_timestamp(mut metrics: Vec) -> Vec { let now_ts = Utc::now().timestamp(); - // For each metric, see if it's a counter. If so, we check the rest of the metrics - // _after_ it to see if they share the same series _and_ timestamp, when converted - // to a Unix timestamp. If they match, we take that counter's value and merge it - // with our "current" counter metric, and then drop the secondary one from the - // vector. - // - // For any non-counter, we simply ignore it and leave it as-is. - while idx < metrics.len() { - let curr_idx = idx; - let counter_ts = match metrics[curr_idx].value() { - MetricValue::Counter { .. } => metrics[curr_idx] - .data() - .timestamp() - .map(|dt| dt.timestamp()) - .unwrap_or(now_ts), - // If it's not a counter, we can skip it. - _ => { - idx += 1; - continue; - } + // Sort by series and timestamp which is required for the below dedupe to behave as desired. + // This also tends to compress better than a random ordering by 2-3x (JSON encoded, deflate algorithm). + metrics.sort_unstable_by(|a, b| { + ( + a.series(), + a.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts), + ) + .cmp(&( + b.series(), + b.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts), + )) + }); + + metrics.dedup_by(|left, right| { + // Only aggregate counters. All other types can be skipped. + let MetricValue::Counter { value: left_value } = left.value() else { + return false; }; - - let mut accumulated_value = 0.0; - let mut accumulated_finalizers = EventFinalizers::default(); - - // Now go through each metric _after_ the current one to see if it matches the - // current metric: is a counter, with the same name and timestamp. If it is, we - // accumulate its value and then remove it. - // - // Otherwise, we skip it. - let mut is_disjoint = false; - let mut had_match = false; - let mut inner_idx = curr_idx + 1; - while inner_idx < metrics.len() { - let mut should_advance = true; - if let MetricValue::Counter { value } = metrics[inner_idx].value() { - let other_counter_ts = metrics[inner_idx] - .data() - .timestamp() - .map(|dt| dt.timestamp()) - .unwrap_or(now_ts); - if metrics[curr_idx].series() == metrics[inner_idx].series() - && counter_ts == other_counter_ts - { - had_match = true; - - // Collapse this counter by accumulating its value, and its - // finalizers, and removing it from the original vector of metrics. - accumulated_value += *value; - - let mut old_metric = metrics.swap_remove(inner_idx); - accumulated_finalizers.merge(old_metric.metadata_mut().take_finalizers()); - should_advance = false; - } else { - // We hit a counter that _doesn't_ match, but we can't just skip - // it because we also need to evaluate it against all the - // counters that come after it, so we only increment the index - // for this inner loop. - // - // As well, we mark ourselves to stop incrementing the outer - // index if we find more counters to accumulate, because we've - // hit a disjoint counter here. While we may be continuing to - // shrink the count of remaining metrics from accumulating, - // we have to ensure this counter we just visited is visited by - // the outer loop. - is_disjoint = true; - } - } - - if should_advance { - inner_idx += 1; - - if !is_disjoint { - idx += 1; - } - } + if !matches!(right.value(), MetricValue::Counter { .. }) { + return false; + } + if left.series() != right.series() { + return false; } - // If we had matches during the accumulator phase, update our original counter. - if had_match { - let metric = metrics.get_mut(curr_idx).expect("current index must exist"); - match metric.value_mut() { - MetricValue::Counter { value } => { - *value += accumulated_value; - metric - .metadata_mut() - .merge_finalizers(accumulated_finalizers); - } - _ => unreachable!("current index must represent a counter"), - } + let left_ts = left.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts); + let right_ts = right.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts); + if left_ts != right_ts { + return false; } - idx += 1; - } + // If we reach this point, the counters are for the same series and have the same timestamp. + let MetricValue::Counter { value: right_value } = right.value_mut() else { + return false; + }; + // NOTE: The docs for `dedup_by` specify that if `left`/`right` are equal, then + // `left` is the element that gets removed. + *right_value += left_value; + right + .metadata_mut() + .merge_finalizers(left.metadata_mut().take_finalizers()); + + true + }); metrics } #[cfg(test)] mod tests { + use std::time::Duration; + use chrono::{DateTime, Utc}; use proptest::prelude::*; use vector_core::event::{Metric, MetricKind, MetricValue}; - use super::collapse_counters_by_series_and_timestamp; + use super::sort_and_collapse_counters_by_series_and_timestamp; fn arb_collapsible_metrics() -> impl Strategy> { let ts = Utc::now(); @@ -315,7 +251,7 @@ mod tests { fn collapse_no_metrics() { let input = Vec::new(); let expected = input.clone(); - let actual = collapse_counters_by_series_and_timestamp(input); + let actual = sort_and_collapse_counters_by_series_and_timestamp(input); assert_eq!(expected, actual); } @@ -324,7 +260,7 @@ mod tests { fn collapse_single_metric() { let input = vec![create_counter("basic", 42.0)]; let expected = input.clone(); - let actual = collapse_counters_by_series_and_timestamp(input); + let actual = sort_and_collapse_counters_by_series_and_timestamp(input); assert_eq!(expected, actual); } @@ -333,7 +269,7 @@ mod tests { fn collapse_identical_metrics_gauge() { let input = vec![create_gauge("basic", 42.0), create_gauge("basic", 42.0)]; let expected = input.clone(); - let actual = collapse_counters_by_series_and_timestamp(input); + let actual = sort_and_collapse_counters_by_series_and_timestamp(input); assert_eq!(expected, actual); @@ -348,7 +284,7 @@ mod tests { create_gauge("basic", gauge_value), ]; let expected = input.clone(); - let actual = collapse_counters_by_series_and_timestamp(input); + let actual = sort_and_collapse_counters_by_series_and_timestamp(input); assert_eq!(expected, actual); } @@ -368,7 +304,59 @@ mod tests { let expected_counter_value = input.len() as f64 * counter_value; let expected = vec![create_counter("basic", expected_counter_value)]; - let actual = collapse_counters_by_series_and_timestamp(input); + let actual = sort_and_collapse_counters_by_series_and_timestamp(input); + + assert_eq!(expected, actual); + } + + #[test] + fn collapse_identical_metrics_counter_unsorted() { + let gauge_value = 1.0; + let counter_value = 42.0; + let input = vec![ + create_counter("gauge", gauge_value), + create_counter("basic", counter_value), + create_counter("basic", counter_value), + create_counter("basic", counter_value), + create_counter("gauge", gauge_value), + create_counter("basic", counter_value), + create_counter("basic", counter_value), + create_counter("basic", counter_value), + create_counter("basic", counter_value), + ]; + + let expected_counter_value = input.len() as f64 * counter_value; + let expected = vec![ + create_counter("basic", expected_counter_value), + create_counter("gauge", gauge_value), + create_counter("gauge", gauge_value), + ]; + let actual = sort_and_collapse_counters_by_series_and_timestamp(input); + + assert_eq!(expected, actual); + } + + #[test] + fn collapse_identical_metrics_multiple_timestamps() { + let ts_1 = Utc::now() - Duration::from_secs(5); + let ts_2 = ts_1 - Duration::from_secs(5); + let counter_value = 42.0; + let input = vec![ + create_counter("basic", counter_value), + create_counter("basic", counter_value).with_timestamp(Some(ts_1)), + create_counter("basic", counter_value).with_timestamp(Some(ts_2)), + create_counter("basic", counter_value), + create_counter("basic", counter_value).with_timestamp(Some(ts_2)), + create_counter("basic", counter_value).with_timestamp(Some(ts_1)), + create_counter("basic", counter_value), + ]; + + let expected = vec![ + create_counter("basic", counter_value * 2.).with_timestamp(Some(ts_2)), + create_counter("basic", counter_value * 2.).with_timestamp(Some(ts_1)), + create_counter("basic", counter_value * 3.), + ]; + let actual = sort_and_collapse_counters_by_series_and_timestamp(input); assert_eq!(expected, actual); } @@ -419,8 +407,7 @@ mod tests { expected_output.sort_by_cached_key(MetricCollapseSort::from_metric); expected_output.dedup_by(collapse_dedup_fn); - let mut actual_output = collapse_counters_by_series_and_timestamp(input); - actual_output.sort_by_cached_key(MetricCollapseSort::from_metric); + let actual_output = sort_and_collapse_counters_by_series_and_timestamp(input); prop_assert_eq!(expected_output, actual_output); } diff --git a/src/sinks/util/buffer/metrics/mod.rs b/src/sinks/util/buffer/metrics/mod.rs index 877cdc9c4bcc1..e66b3c2364140 100644 --- a/src/sinks/util/buffer/metrics/mod.rs +++ b/src/sinks/util/buffer/metrics/mod.rs @@ -1,5 +1,3 @@ -pub mod sort; - use std::cmp::Ordering; use vector_core::event::metric::{Metric, MetricValue, Sample}; diff --git a/src/sinks/util/buffer/metrics/sort.rs b/src/sinks/util/buffer/metrics/sort.rs deleted file mode 100644 index feaa563493789..0000000000000 --- a/src/sinks/util/buffer/metrics/sort.rs +++ /dev/null @@ -1,67 +0,0 @@ -use crate::event::Metric; - -/// Sorts metrics in an order that is likely to achieve good compression. -pub fn sort_for_compression(metrics: &mut [Metric]) { - // This just sorts by series today. This tends to compress better than a random ordering by - // 2-3x (JSON encoded, deflate algorithm) - metrics.sort_unstable_by(|a, b| a.series().cmp(b.series())) -} - -#[cfg(test)] -mod test { - use crate::event::MetricValue; - use rand::prelude::SliceRandom; - use rand::thread_rng; - use vector_core::event::{Metric, MetricKind}; - use vector_core::metric_tags; - - // This just ensures the sorting does not change. `sort_for_compression` relies on - // the default `PartialOrd` on `MetricSeries`. - #[test] - fn test_compression_order() { - let sorted_metrics = vec![ - Metric::new( - "metric_1", - MetricKind::Absolute, - MetricValue::Gauge { value: 0.0 }, - ), - Metric::new( - "metric_2", - MetricKind::Incremental, - MetricValue::Gauge { value: 0.0 }, - ), - Metric::new( - "metric_3", - MetricKind::Absolute, - MetricValue::Gauge { value: 0.0 }, - ) - .with_tags(Some(metric_tags!("z" => "z"))), - Metric::new( - "metric_4", - MetricKind::Absolute, - MetricValue::Gauge { value: 0.0 }, - ) - .with_tags(Some(metric_tags!("a" => "a"))), - Metric::new( - "metric_4", - MetricKind::Absolute, - MetricValue::Gauge { value: 0.0 }, - ) - .with_tags(Some(metric_tags!( - "a" => "a", - "b" => "b", - ))), - Metric::new( - "metric_4", - MetricKind::Absolute, - MetricValue::Gauge { value: 0.0 }, - ) - .with_tags(Some(metric_tags!("b" => "b"))), - ]; - - let mut rand_metrics = sorted_metrics.clone(); - rand_metrics.shuffle(&mut thread_rng()); - super::sort_for_compression(&mut rand_metrics); - assert_eq!(sorted_metrics, rand_metrics); - } -} From 8a9f9bff6b332a3266324c094711dd599a4d5bae Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Wed, 4 Oct 2023 10:41:48 -0400 Subject: [PATCH 2/7] fix tests and add concurrency --- src/sinks/datadog/metrics/sink.rs | 27 +++++++++++++++++---------- src/sinks/util/builder.rs | 6 +++--- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/src/sinks/datadog/metrics/sink.rs b/src/sinks/datadog/metrics/sink.rs index 80d31a14afe4a..ba36abb83eb61 100644 --- a/src/sinks/datadog/metrics/sink.rs +++ b/src/sinks/datadog/metrics/sink.rs @@ -23,6 +23,7 @@ use crate::{ internal_events::DatadogMetricsEncodingError, sinks::util::{ buffer::metrics::{AggregatedSummarySplitter, MetricSplitter}, + request_builder::default_request_builder_concurrency_limit, SinkBuilderExt, }, }; @@ -103,10 +104,16 @@ where // last-write-wins situation when they hit the DD metrics intake. // // This also sorts metrics by name, which significantly improves HTTP compression. - .map(|((api_key, endpoint), metrics)| { - let collapsed_metrics = sort_and_collapse_counters_by_series_and_timestamp(metrics); - ((api_key, endpoint), collapsed_metrics) - }) + .concurrent_map( + default_request_builder_concurrency_limit(), + |((api_key, endpoint), metrics)| { + Box::pin(async move { + let collapsed_metrics = + sort_and_collapse_counters_by_series_and_timestamp(metrics); + ((api_key, endpoint), collapsed_metrics) + }) + }, + ) // We build our requests "incrementally", which means that for a single batch of metrics, we might generate // N requests to send them all, as Datadog has API-level limits on payload size, so we keep adding metrics // to a request until we reach the limit, and then create a new request, and so on and so forth, until all @@ -158,7 +165,7 @@ where /// The return value is sorted by metric series, which is desirable for compression. A sorted vector /// tends to compress better than a random ordering by 2-3x (JSON encoded, deflate algorithm). /// -/// Note that the time complexity of this function is O(nlogn) and the space complexity is O(1). +/// Note that the time complexity of this function is O(n log n) and the space complexity is O(1). /// If needed, we can trade space for time by using a HashMap, which would be O(n) time and O(n) space. fn sort_and_collapse_counters_by_series_and_timestamp(mut metrics: Vec) -> Vec { let now_ts = Utc::now().timestamp(); @@ -314,22 +321,22 @@ mod tests { let gauge_value = 1.0; let counter_value = 42.0; let input = vec![ - create_counter("gauge", gauge_value), + create_gauge("gauge", gauge_value), create_counter("basic", counter_value), create_counter("basic", counter_value), create_counter("basic", counter_value), - create_counter("gauge", gauge_value), + create_gauge("gauge", gauge_value), create_counter("basic", counter_value), create_counter("basic", counter_value), create_counter("basic", counter_value), create_counter("basic", counter_value), ]; - let expected_counter_value = input.len() as f64 * counter_value; + let expected_counter_value = (input.len() - 2) as f64 * counter_value; let expected = vec![ create_counter("basic", expected_counter_value), - create_counter("gauge", gauge_value), - create_counter("gauge", gauge_value), + create_gauge("gauge", gauge_value), + create_gauge("gauge", gauge_value), ]; let actual = sort_and_collapse_counters_by_series_and_timestamp(input); diff --git a/src/sinks/util/builder.rs b/src/sinks/util/builder.rs index c51bf405dc4d1..617697ea84ec1 100644 --- a/src/sinks/util/builder.rs +++ b/src/sinks/util/builder.rs @@ -82,13 +82,13 @@ pub trait SinkBuilderExt: Stream { /// /// If the spawned future panics, the panic will be carried through and resumed on the task /// calling the stream. - fn concurrent_map(self, limit: Option, f: F) -> ConcurrentMap + fn concurrent_map(self, limit: NonZeroUsize, f: F) -> ConcurrentMap where Self: Sized, F: Fn(Self::Item) -> Pin + Send + 'static>> + Send + 'static, T: Send + 'static, { - ConcurrentMap::new(self, limit, f) + ConcurrentMap::new(self, Some(limit), f) } /// Constructs a [`Stream`] which transforms the input into a request suitable for sending to @@ -114,7 +114,7 @@ pub trait SinkBuilderExt: Stream { { let builder = Arc::new(builder); - self.concurrent_map(Some(limit), move |input| { + self.concurrent_map(limit, move |input| { let builder = Arc::clone(&builder); Box::pin(async move { From aca62781c91312232d03daad3b7a563ba36ed96d Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Wed, 4 Oct 2023 16:44:03 -0400 Subject: [PATCH 3/7] feedback --- src/sinks/datadog/metrics/sink.rs | 75 ++++++++++++++++++++++--------- 1 file changed, 55 insertions(+), 20 deletions(-) diff --git a/src/sinks/datadog/metrics/sink.rs b/src/sinks/datadog/metrics/sink.rs index ba36abb83eb61..fc95e1d2172fd 100644 --- a/src/sinks/datadog/metrics/sink.rs +++ b/src/sinks/datadog/metrics/sink.rs @@ -172,6 +172,7 @@ fn sort_and_collapse_counters_by_series_and_timestamp(mut metrics: Vec) // Sort by series and timestamp which is required for the below dedupe to behave as desired. // This also tends to compress better than a random ordering by 2-3x (JSON encoded, deflate algorithm). + // Note that `sort_unstable_by_key` would be simpler but results in lifetime errors without cloning. metrics.sort_unstable_by(|a, b| { ( a.series(), @@ -183,14 +184,8 @@ fn sort_and_collapse_counters_by_series_and_timestamp(mut metrics: Vec) )) }); + // Aggregate counters that share the same series and timestamp. metrics.dedup_by(|left, right| { - // Only aggregate counters. All other types can be skipped. - let MetricValue::Counter { value: left_value } = left.value() else { - return false; - }; - if !matches!(right.value(), MetricValue::Counter { .. }) { - return false; - } if left.series() != right.series() { return false; } @@ -201,18 +196,23 @@ fn sort_and_collapse_counters_by_series_and_timestamp(mut metrics: Vec) return false; } - // If we reach this point, the counters are for the same series and have the same timestamp. - let MetricValue::Counter { value: right_value } = right.value_mut() else { - return false; - }; - // NOTE: The docs for `dedup_by` specify that if `left`/`right` are equal, then - // `left` is the element that gets removed. - *right_value += left_value; - right - .metadata_mut() - .merge_finalizers(left.metadata_mut().take_finalizers()); - - true + // Only aggregate counters. All other types can be skipped. + if let ( + MetricValue::Counter { value: left_value }, + MetricValue::Counter { value: right_value }, + ) = (left.value(), right.value_mut()) + { + // NOTE: The docs for `dedup_by` specify that if `left`/`right` are equal, then + // `left` is the element that gets removed. + *right_value += left_value; + right + .metadata_mut() + .merge_finalizers(left.metadata_mut().take_finalizers()); + + true + } else { + false + } }); metrics @@ -224,7 +224,10 @@ mod tests { use chrono::{DateTime, Utc}; use proptest::prelude::*; - use vector_core::event::{Metric, MetricKind, MetricValue}; + use vector_core::{ + event::{Metric, MetricKind, MetricValue}, + metric_tags, + }; use super::sort_and_collapse_counters_by_series_and_timestamp; @@ -368,6 +371,38 @@ mod tests { assert_eq!(expected, actual); } + #[test] + fn collapse_identical_metrics_with_tags() { + let counter_value = 42.0; + let input = vec![ + create_counter("basic", counter_value).with_tags(Some(metric_tags!("a" => "a"))), + create_counter("basic", counter_value).with_tags(Some(metric_tags!( + "a" => "a", + "b" => "b", + ))), + create_counter("basic", counter_value), + create_counter("basic", counter_value).with_tags(Some(metric_tags!( + "b" => "b", + "a" => "a", + ))), + create_counter("basic", counter_value), + create_counter("basic", counter_value), + create_counter("basic", counter_value).with_tags(Some(metric_tags!("a" => "a"))), + ]; + + let expected = vec![ + create_counter("basic", counter_value * 3.), + create_counter("basic", counter_value * 2.).with_tags(Some(metric_tags!("a" => "a"))), + create_counter("basic", counter_value * 2.).with_tags(Some(metric_tags!( + "a" => "a", + "b" => "b", + ))), + ]; + let actual = sort_and_collapse_counters_by_series_and_timestamp(input); + + assert_eq!(expected, actual); + } + #[derive(Eq, Ord, PartialEq, PartialOrd)] struct MetricCollapseSort { metric_type: &'static str, From 273a0655f0663ef044dafdc5c2cdae4f0ca1bb70 Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Thu, 5 Oct 2023 09:03:24 -0400 Subject: [PATCH 4/7] add comment to dedupe_by --- src/sinks/datadog/metrics/sink.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/sinks/datadog/metrics/sink.rs b/src/sinks/datadog/metrics/sink.rs index fc95e1d2172fd..a73a9ea876739 100644 --- a/src/sinks/datadog/metrics/sink.rs +++ b/src/sinks/datadog/metrics/sink.rs @@ -185,6 +185,9 @@ fn sort_and_collapse_counters_by_series_and_timestamp(mut metrics: Vec) }); // Aggregate counters that share the same series and timestamp. + // While `coalesce` is semantically more fitting here than `dedupe_by`, we opt for the latter because + // they share the same functionality and `dedupe_by`'s implementation is more optimized, doing the + // operation in place. metrics.dedup_by(|left, right| { if left.series() != right.series() { return false; From 472fdb4a2021c37cd9f5d95e86f887dd69fbdc59 Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Thu, 5 Oct 2023 10:24:44 -0400 Subject: [PATCH 5/7] sort by metric type as well --- src/sinks/datadog/metrics/sink.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/sinks/datadog/metrics/sink.rs b/src/sinks/datadog/metrics/sink.rs index a73a9ea876739..5375e8472d95f 100644 --- a/src/sinks/datadog/metrics/sink.rs +++ b/src/sinks/datadog/metrics/sink.rs @@ -175,10 +175,12 @@ fn sort_and_collapse_counters_by_series_and_timestamp(mut metrics: Vec) // Note that `sort_unstable_by_key` would be simpler but results in lifetime errors without cloning. metrics.sort_unstable_by(|a, b| { ( + a.value().as_name(), a.series(), a.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts), ) .cmp(&( + a.value().as_name(), b.series(), b.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts), )) From e76e08c6e548f95d4d623e4d63c75a6530fdf156 Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Thu, 5 Oct 2023 13:57:22 -0400 Subject: [PATCH 6/7] fix flakey proptest --- src/sinks/datadog/metrics/sink.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/sinks/datadog/metrics/sink.rs b/src/sinks/datadog/metrics/sink.rs index 5375e8472d95f..b3e0fe7574b9c 100644 --- a/src/sinks/datadog/metrics/sink.rs +++ b/src/sinks/datadog/metrics/sink.rs @@ -225,7 +225,7 @@ fn sort_and_collapse_counters_by_series_and_timestamp(mut metrics: Vec) #[cfg(test)] mod tests { - use std::time::Duration; + use std::{collections::HashSet, time::Duration}; use chrono::{DateTime, Utc}; use proptest::prelude::*; @@ -240,12 +240,14 @@ mod tests { let ts = Utc::now(); any::>().prop_map(move |values| { + let mut unique_metrics = HashSet::new(); values .into_iter() .map(|(id, value)| { let name = format!("{}-{}", value.as_name(), id); Metric::new(name, MetricKind::Incremental, value).with_timestamp(Some(ts)) }) + .filter(|metric| unique_metrics.insert(metric.series().clone())) // Filter out duplicates .collect() }) } From a1eb365f87cf431667f23b7948d75117378d1f72 Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Thu, 5 Oct 2023 14:24:14 -0400 Subject: [PATCH 7/7] only dedupe non-counters in prop test --- src/sinks/datadog/metrics/sink.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/sinks/datadog/metrics/sink.rs b/src/sinks/datadog/metrics/sink.rs index b3e0fe7574b9c..fccfe040fdd95 100644 --- a/src/sinks/datadog/metrics/sink.rs +++ b/src/sinks/datadog/metrics/sink.rs @@ -247,7 +247,12 @@ mod tests { let name = format!("{}-{}", value.as_name(), id); Metric::new(name, MetricKind::Incremental, value).with_timestamp(Some(ts)) }) - .filter(|metric| unique_metrics.insert(metric.series().clone())) // Filter out duplicates + // Filter out duplicates other than counters. We do this to prevent false positives. False positives would occur + // because we don't collapse other metric types and we can't sort metrics by their values. + .filter(|metric| { + matches!(metric.value(), MetricValue::Counter { .. }) + || unique_metrics.insert(metric.series().clone()) + }) .collect() }) }