diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 7bc06fea6d..da91a7151f 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -46,6 +46,10 @@ - `LoggerProviderInner` is no longer `pub (crate)` - `Logger.provider()` now returns `&LoggerProvider` instead of an `Option` +- [1519](https://github.com/open-telemetry/opentelemetry-rust/pull/1519) Performance improvements + when calling `Counter::add()` and `UpDownCounter::add()` with an empty set of attributes + (e.g. `counter.Add(5, &[])`) + ### Fixed - [#1481](https://github.com/open-telemetry/opentelemetry-rust/pull/1481) Fix error message caused by race condition when using PeriodicReader diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index c111214fac..92bc3d947f 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -6,10 +6,26 @@ mod sum; use core::fmt; use std::ops::{Add, AddAssign, Sub}; +use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; +use std::sync::Mutex; pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure}; pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE}; +/// Marks a type that can have a value added and retrieved atomically. Required since +/// different types have different backing atomic mechanisms +pub(crate) trait AtomicTracker: Sync + Send + 'static { + fn add(&self, value: T); + fn get_value(&self) -> T; + fn get_and_reset_value(&self) -> T; +} + +/// Marks a type that can have an atomic tracker generated for it +pub(crate) trait AtomicallyUpdate { + type AtomicTracker: AtomicTracker; + fn new_atomic_tracker() -> Self::AtomicTracker; +} + pub(crate) trait Number: Add + AddAssign @@ -23,6 +39,7 @@ pub(crate) trait Number: + Send + Sync + 'static + + AtomicallyUpdate { fn min() -> Self; fn max() -> Self; @@ -71,3 +88,159 @@ impl Number for f64 { self } } + +impl AtomicTracker for AtomicU64 { + fn add(&self, value: u64) { + self.fetch_add(value, Ordering::Relaxed); + } + + fn get_value(&self) -> u64 { + self.load(Ordering::Relaxed) + } + + fn get_and_reset_value(&self) -> u64 { + self.swap(0, Ordering::Relaxed) + } +} + +impl AtomicallyUpdate for u64 { + type AtomicTracker = AtomicU64; + + fn new_atomic_tracker() -> Self::AtomicTracker { + AtomicU64::new(0) + } +} + +impl AtomicTracker for AtomicI64 { + fn add(&self, value: i64) { + self.fetch_add(value, Ordering::Relaxed); + } + + fn get_value(&self) -> i64 { + self.load(Ordering::Relaxed) + } + + fn get_and_reset_value(&self) -> i64 { + self.swap(0, Ordering::Relaxed) + } +} + +impl AtomicallyUpdate for i64 { + type AtomicTracker = AtomicI64; + + fn new_atomic_tracker() -> Self::AtomicTracker { + AtomicI64::new(0) + } +} + +pub(crate) struct F64AtomicTracker { + inner: Mutex, // Floating points don't have true atomics, so we need to use mutex for them +} + +impl F64AtomicTracker { + fn new() -> Self { + F64AtomicTracker { + inner: Mutex::new(0.0), + } + } +} + +impl AtomicTracker for F64AtomicTracker { + fn add(&self, value: f64) { + let mut guard = self.inner.lock().expect("F64 mutex was poisoned"); + *guard += value; + } + + fn get_value(&self) -> f64 { + let guard = self.inner.lock().expect("F64 mutex was poisoned"); + *guard + } + + fn get_and_reset_value(&self) -> f64 { + let mut guard = self.inner.lock().expect("F64 mutex was poisoned"); + let value = *guard; + *guard = 0.0; + + value + } +} + +impl AtomicallyUpdate for f64 { + type AtomicTracker = F64AtomicTracker; + + fn new_atomic_tracker() -> Self::AtomicTracker { + F64AtomicTracker::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn can_add_and_get_u64_atomic_value() { + let atomic = u64::new_atomic_tracker(); + atomic.add(15); + atomic.add(10); + + let value = atomic.get_value(); + assert_eq!(value, 25); + } + + #[test] + fn can_reset_u64_atomic_value() { + let atomic = u64::new_atomic_tracker(); + atomic.add(15); + + let value = atomic.get_and_reset_value(); + let value2 = atomic.get_value(); + + assert_eq!(value, 15, "Incorrect first value"); + assert_eq!(value2, 0, "Incorrect second value"); + } + + #[test] + fn can_add_and_get_i64_atomic_value() { + let atomic = i64::new_atomic_tracker(); + atomic.add(15); + atomic.add(-10); + + let value = atomic.get_value(); + assert_eq!(value, 5); + } + + #[test] + fn can_reset_i64_atomic_value() { + let atomic = i64::new_atomic_tracker(); + atomic.add(15); + + let value = atomic.get_and_reset_value(); + let value2 = atomic.get_value(); + + assert_eq!(value, 15, "Incorrect first value"); + assert_eq!(value2, 0, "Incorrect second value"); + } + + #[test] + fn can_add_and_get_f64_atomic_value() { + let atomic = f64::new_atomic_tracker(); + atomic.add(15.3); + atomic.add(10.4); + + let value = atomic.get_value(); + + assert!(f64::abs(25.7 - value) < 0.0001); + } + + #[test] + fn can_reset_f64_atomic_value() { + let atomic = f64::new_atomic_tracker(); + atomic.add(15.5); + + let value = atomic.get_and_reset_value(); + let value2 = atomic.get_value(); + + assert!(f64::abs(15.5 - value) < 0.0001, "Incorrect first value"); + assert!(f64::abs(0.0 - value2) < 0.0001, "Incorrect second value"); + } +} diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 3fac77c459..83a6b07858 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -1,3 +1,4 @@ +use std::sync::atomic::{AtomicBool, Ordering}; use std::{ collections::{hash_map::Entry, HashMap}, sync::Mutex, @@ -10,26 +11,39 @@ use opentelemetry::{global, metrics::MetricsError}; use super::{ aggregate::{is_under_cardinality_limit, STREAM_OVERFLOW_ATTRIBUTE_SET}, - Number, + AtomicTracker, Number, }; /// The storage for sums. -#[derive(Default)] struct ValueMap> { values: Mutex>, + has_no_value_attribute_value: AtomicBool, + no_attribute_value: T::AtomicTracker, +} + +impl> Default for ValueMap { + fn default() -> Self { + ValueMap::new() + } } impl> ValueMap { fn new() -> Self { ValueMap { values: Mutex::new(HashMap::new()), + has_no_value_attribute_value: AtomicBool::new(false), + no_attribute_value: T::new_atomic_tracker(), } } } impl> ValueMap { fn measure(&self, measurement: T, attrs: AttributeSet) { - if let Ok(mut values) = self.values.lock() { + if attrs.is_empty() { + self.no_attribute_value.add(measurement); + self.has_no_value_attribute_value + .store(true, Ordering::Release); + } else if let Ok(mut values) = self.values.lock() { let size = values.len(); match values.entry(attrs) { Entry::Occupied(mut occupied_entry) => { @@ -103,7 +117,7 @@ impl> Sum { Err(_) => return (0, None), }; - let n = values.len(); + let n = values.len() + 1; if n > s_data.data_points.capacity() { s_data .data_points @@ -111,6 +125,20 @@ impl> Sum { } let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); + if self + .value_map + .has_no_value_attribute_value + .swap(false, Ordering::AcqRel) + { + s_data.data_points.push(DataPoint { + attributes: AttributeSet::default(), + start_time: Some(prev_start), + time: Some(t), + value: self.value_map.no_attribute_value.get_and_reset_value(), + exemplars: vec![], + }); + } + for (attrs, value) in values.drain() { s_data.data_points.push(DataPoint { attributes: attrs, @@ -126,7 +154,10 @@ impl> Sum { *start = t; } - (n, new_agg.map(|a| Box::new(a) as Box<_>)) + ( + s_data.data_points.len(), + new_agg.map(|a| Box::new(a) as Box<_>), + ) } pub(crate) fn cumulative( @@ -155,7 +186,7 @@ impl> Sum { Err(_) => return (0, None), }; - let n = values.len(); + let n = values.len() + 1; if n > s_data.data_points.capacity() { s_data .data_points @@ -163,6 +194,21 @@ impl> Sum { } let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); + + if self + .value_map + .has_no_value_attribute_value + .load(Ordering::Acquire) + { + s_data.data_points.push(DataPoint { + attributes: AttributeSet::default(), + start_time: Some(prev_start), + time: Some(t), + value: self.value_map.no_attribute_value.get_value(), + exemplars: vec![], + }); + } + // TODO: This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not @@ -177,7 +223,10 @@ impl> Sum { }); } - (n, new_agg.map(|a| Box::new(a) as Box<_>)) + ( + s_data.data_points.len(), + new_agg.map(|a| Box::new(a) as Box<_>), + ) } } @@ -230,7 +279,7 @@ impl> PrecomputedSum { Err(_) => return (0, None), }; - let n = values.len(); + let n = values.len() + 1; if n > s_data.data_points.capacity() { s_data .data_points @@ -242,6 +291,20 @@ impl> PrecomputedSum { Err(_) => return (0, None), }; + if self + .value_map + .has_no_value_attribute_value + .swap(false, Ordering::AcqRel) + { + s_data.data_points.push(DataPoint { + attributes: AttributeSet::default(), + start_time: Some(prev_start), + time: Some(t), + value: self.value_map.no_attribute_value.get_and_reset_value(), + exemplars: vec![], + }); + } + let default = T::default(); for (attrs, value) in values.drain() { let delta = value - *reported.get(&attrs).unwrap_or(&default); @@ -265,7 +328,10 @@ impl> PrecomputedSum { *reported = new_reported; drop(reported); // drop before values guard is dropped - (n, new_agg.map(|a| Box::new(a) as Box<_>)) + ( + s_data.data_points.len(), + new_agg.map(|a| Box::new(a) as Box<_>), + ) } pub(crate) fn cumulative( @@ -295,7 +361,7 @@ impl> PrecomputedSum { Err(_) => return (0, None), }; - let n = values.len(); + let n = values.len() + 1; if n > s_data.data_points.capacity() { s_data .data_points @@ -307,6 +373,20 @@ impl> PrecomputedSum { Err(_) => return (0, None), }; + if self + .value_map + .has_no_value_attribute_value + .load(Ordering::Acquire) + { + s_data.data_points.push(DataPoint { + attributes: AttributeSet::default(), + start_time: Some(prev_start), + time: Some(t), + value: self.value_map.no_attribute_value.get_value(), + exemplars: vec![], + }); + } + let default = T::default(); for (attrs, value) in values.iter() { let delta = *value - *reported.get(attrs).unwrap_or(&default); @@ -325,6 +405,9 @@ impl> PrecomputedSum { *reported = new_reported; drop(reported); // drop before values guard is dropped - (n, new_agg.map(|a| Box::new(a) as Box<_>)) + ( + s_data.data_points.len(), + new_agg.map(|a| Box::new(a) as Box<_>), + ) } } diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 021ee3d469..96e35aeba4 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -61,7 +61,11 @@ pub use view::*; #[cfg(all(test, feature = "testing"))] mod tests { use super::*; + use crate::metrics::data::{ResourceMetrics, Temporality}; + use crate::metrics::reader::TemporalitySelector; + use crate::testing::metrics::InMemoryMetricsExporterBuilder; use crate::{runtime, testing::metrics::InMemoryMetricsExporter}; + use opentelemetry::metrics::{Counter, UpDownCounter}; use opentelemetry::{ metrics::{MeterProvider as _, Unit}, KeyValue, @@ -428,4 +432,260 @@ mod tests { let data_point = &sum.data_points[0]; assert_eq!(data_point.value, 30); } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn no_attr_cumulative_counter() { + let mut test_context = TestContext::new(Some(Temporality::Cumulative)); + let counter = test_context.u64_counter("test", "my_counter", "my_unit"); + + counter.add(50, &[]); + test_context.flush_metrics(); + + let sum = test_context.get_aggregation::>("my_counter", "my_unit"); + + assert_eq!(sum.data_points.len(), 1, "Expected only one data point"); + assert!(sum.is_monotonic, "Should produce monotonic."); + assert_eq!( + sum.temporality, + Temporality::Cumulative, + "Should produce cumulative" + ); + + let data_point = &sum.data_points[0]; + assert!(data_point.attributes.is_empty(), "Non-empty attribute set"); + assert_eq!(data_point.value, 50, "Unexpected data point value"); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn no_attr_delta_counter() { + let mut test_context = TestContext::new(Some(Temporality::Delta)); + let counter = test_context.u64_counter("test", "my_counter", "my_unit"); + + counter.add(50, &[]); + test_context.flush_metrics(); + + let sum = test_context.get_aggregation::>("my_counter", "my_unit"); + + assert_eq!(sum.data_points.len(), 1, "Expected only one data point"); + assert!(sum.is_monotonic, "Should produce monotonic."); + assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta"); + + let data_point = &sum.data_points[0]; + assert!(data_point.attributes.is_empty(), "Non-empty attribute set"); + assert_eq!(data_point.value, 50, "Unexpected data point value"); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn no_attr_cumulative_up_down_counter() { + let mut test_context = TestContext::new(Some(Temporality::Cumulative)); + let counter = test_context.i64_up_down_counter("test", "my_counter", "my_unit"); + + counter.add(50, &[]); + test_context.flush_metrics(); + + let sum = test_context.get_aggregation::>("my_counter", "my_unit"); + + assert_eq!(sum.data_points.len(), 1, "Expected only one data point"); + assert!(!sum.is_monotonic, "Should not produce monotonic."); + assert_eq!( + sum.temporality, + Temporality::Cumulative, + "Should produce cumulative" + ); + + let data_point = &sum.data_points[0]; + assert!(data_point.attributes.is_empty(), "Non-empty attribute set"); + assert_eq!(data_point.value, 50, "Unexpected data point value"); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn no_attr_delta_up_down_counter() { + let mut test_context = TestContext::new(Some(Temporality::Delta)); + let counter = test_context.i64_up_down_counter("test", "my_counter", "my_unit"); + + counter.add(50, &[]); + test_context.flush_metrics(); + + let sum = test_context.get_aggregation::>("my_counter", "my_unit"); + + assert_eq!(sum.data_points.len(), 1, "Expected only one data point"); + assert!(!sum.is_monotonic, "Should not produce monotonic."); + assert_eq!(sum.temporality, Temporality::Delta, "Should produce Delta"); + + let data_point = &sum.data_points[0]; + assert!(data_point.attributes.is_empty(), "Non-empty attribute set"); + assert_eq!(data_point.value, 50, "Unexpected data point value"); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn no_attr_cumulative_counter_value_added_after_export() { + let mut test_context = TestContext::new(Some(Temporality::Cumulative)); + let counter = test_context.u64_counter("test", "my_counter", "my_unit"); + + counter.add(50, &[]); + test_context.flush_metrics(); + let _ = test_context.get_aggregation::>("my_counter", "my_unit"); + + counter.add(5, &[]); + test_context.flush_metrics(); + let sum = test_context.get_aggregation::>("my_counter", "my_unit"); + + assert_eq!(sum.data_points.len(), 1, "Expected only one data point"); + assert!(sum.is_monotonic, "Should produce monotonic."); + assert_eq!( + sum.temporality, + Temporality::Cumulative, + "Should produce cumulative" + ); + + let data_point = &sum.data_points[0]; + assert!(data_point.attributes.is_empty(), "Non-empty attribute set"); + assert_eq!(data_point.value, 55, "Unexpected data point value"); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn no_attr_delta_counter_value_reset_after_export() { + let mut test_context = TestContext::new(Some(Temporality::Delta)); + let counter = test_context.u64_counter("test", "my_counter", "my_unit"); + + counter.add(50, &[]); + test_context.flush_metrics(); + let _ = test_context.get_aggregation::>("my_counter", "my_unit"); + + counter.add(5, &[]); + test_context.flush_metrics(); + let sum = test_context.get_aggregation::>("my_counter", "my_unit"); + + assert_eq!(sum.data_points.len(), 1, "Expected only one data point"); + assert!(sum.is_monotonic, "Should produce monotonic."); + assert_eq!( + sum.temporality, + Temporality::Delta, + "Should produce cumulative" + ); + + let data_point = &sum.data_points[0]; + assert!(data_point.attributes.is_empty(), "Non-empty attribute set"); + assert_eq!(data_point.value, 5, "Unexpected data point value"); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn second_delta_export_does_not_give_no_attr_value_if_add_not_called() { + let mut test_context = TestContext::new(Some(Temporality::Delta)); + let counter = test_context.u64_counter("test", "my_counter", "my_unit"); + + counter.add(50, &[]); + test_context.flush_metrics(); + let _ = test_context.get_aggregation::>("my_counter", "my_unit"); + + counter.add(50, &[KeyValue::new("a", "b")]); + test_context.flush_metrics(); + let sum = test_context.get_aggregation::>("my_counter", "my_unit"); + + let no_attr_data_point = sum.data_points.iter().find(|x| x.attributes.is_empty()); + + assert!( + no_attr_data_point.is_none(), + "Expected no data points with no attributes" + ); + } + + struct TestContext { + exporter: InMemoryMetricsExporter, + meter_provider: SdkMeterProvider, + + // Saving this on the test context for lifetime simplicity + resource_metrics: Vec, + } + + impl TestContext { + fn new(temporality: Option) -> Self { + struct TestTemporalitySelector(Temporality); + impl TemporalitySelector for TestTemporalitySelector { + fn temporality(&self, _kind: InstrumentKind) -> Temporality { + self.0 + } + } + + let mut exporter = InMemoryMetricsExporterBuilder::new(); + if let Some(temporality) = temporality { + exporter = exporter.with_temporality_selector(TestTemporalitySelector(temporality)); + } + + let exporter = exporter.build(); + let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + + TestContext { + exporter, + meter_provider, + resource_metrics: vec![], + } + } + + fn u64_counter( + &self, + meter_name: &'static str, + counter_name: &'static str, + unit_name: &'static str, + ) -> Counter { + self.meter_provider + .meter(meter_name) + .u64_counter(counter_name) + .with_unit(Unit::new(unit_name)) + .init() + } + + fn i64_up_down_counter( + &self, + meter_name: &'static str, + counter_name: &'static str, + unit_name: &'static str, + ) -> UpDownCounter { + self.meter_provider + .meter(meter_name) + .i64_up_down_counter(counter_name) + .with_unit(Unit::new(unit_name)) + .init() + } + + fn flush_metrics(&self) { + self.meter_provider.force_flush().unwrap(); + } + + fn get_aggregation( + &mut self, + counter_name: &str, + unit_name: &str, + ) -> &T { + self.resource_metrics = self + .exporter + .get_finished_metrics() + .expect("metrics expected to be exported"); + + assert!( + !self.resource_metrics.is_empty(), + "no metrics were exported" + ); + + // Get the latest resource metric in case of multiple flushes/exports + let resource_metric = self.resource_metrics.last().unwrap(); + + assert!( + !resource_metric.scope_metrics.is_empty(), + "No scope metrics in latest export" + ); + assert!(!resource_metric.scope_metrics[0].metrics.is_empty()); + + let metric = &resource_metric.scope_metrics[0].metrics[0]; + assert_eq!(metric.name, counter_name); + assert_eq!(metric.unit.as_str(), unit_name); + + metric + .data + .as_any() + .downcast_ref::() + .expect("Failed to cast aggregation to expected type") + } + } }