diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h index 446ffac802..927c17e968 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h @@ -97,7 +97,8 @@ class DefaultAggregation case AggregationType::kSum: { bool is_monotonic = true; if (instrument_descriptor.type_ == InstrumentType::kUpDownCounter || - instrument_descriptor.type_ == InstrumentType::kObservableUpDownCounter) + instrument_descriptor.type_ == InstrumentType::kObservableUpDownCounter || + instrument_descriptor.type_ == InstrumentType::kHistogram) { is_monotonic = false; } diff --git a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h index 69920547ba..f601691fd1 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h @@ -27,16 +27,12 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora public: AsyncMetricStorage(InstrumentDescriptor instrument_descriptor, const AggregationType aggregation_type, - const AttributesProcessor *attributes_processor, - const AggregationConfig *aggregation_config, - void *state = nullptr) + const AggregationConfig *aggregation_config) : instrument_descriptor_(instrument_descriptor), aggregation_type_{aggregation_type}, - attributes_processor_{attributes_processor}, - state_{state}, cumulative_hash_map_(new AttributesHashMap()), delta_hash_map_(new AttributesHashMap()), - temporal_metric_storage_(instrument_descriptor, aggregation_config) + temporal_metric_storage_(instrument_descriptor, aggregation_type, aggregation_config) {} template @@ -116,8 +112,6 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora private: InstrumentDescriptor instrument_descriptor_; AggregationType aggregation_type_; - const AttributesProcessor *attributes_processor_; - void *state_; std::unique_ptr cumulative_hash_map_; std::unique_ptr delta_hash_map_; opentelemetry::common::SpinLockMutex hashmap_lock_; diff --git a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h index c6a78451af..7099a44c46 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h @@ -33,17 +33,17 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage nostd::shared_ptr &&exemplar_reservoir, const AggregationConfig *aggregation_config) : instrument_descriptor_(instrument_descriptor), - aggregation_type_{aggregation_type}, attributes_hashmap_(new AttributesHashMap()), attributes_processor_{attributes_processor}, #ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW exemplar_reservoir_(exemplar_reservoir), #endif - temporal_metric_storage_(instrument_descriptor, aggregation_config) + temporal_metric_storage_(instrument_descriptor, aggregation_type, aggregation_config) { - create_default_aggregation_ = [&, aggregation_config]() -> std::unique_ptr { - return DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_, + create_default_aggregation_ = [&, aggregation_type, + aggregation_config]() -> std::unique_ptr { + return DefaultAggregation::CreateAggregation(aggregation_type, instrument_descriptor_, aggregation_config); }; } @@ -120,7 +120,6 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage private: InstrumentDescriptor instrument_descriptor_; - AggregationType aggregation_type_; // hashmap to maintain the metrics for delta collection (i.e, collection since last Collect call) std::unique_ptr attributes_hashmap_; diff --git a/sdk/include/opentelemetry/sdk/metrics/state/temporal_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/temporal_metric_storage.h index 17fe28d8bf..4ddc593149 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/temporal_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/temporal_metric_storage.h @@ -27,6 +27,7 @@ class TemporalMetricStorage { public: TemporalMetricStorage(InstrumentDescriptor instrument_descriptor, + AggregationType aggregation_type, const AggregationConfig *aggregation_config); bool buildMetrics(CollectorHandle *collector, @@ -38,6 +39,7 @@ class TemporalMetricStorage private: InstrumentDescriptor instrument_descriptor_; + AggregationType aggregation_type_; // unreported metrics stash for all the collectors std::unordered_map>> diff --git a/sdk/src/metrics/meter.cc b/sdk/src/metrics/meter.cc index feae7fd3b8..6b8b6c8925 100644 --- a/sdk/src/metrics/meter.cc +++ b/sdk/src/metrics/meter.cc @@ -367,9 +367,8 @@ std::unique_ptr Meter::RegisterAsyncMetricStorage( { view_instr_desc.description_ = view.GetDescription(); } - auto storage = std::shared_ptr( - new AsyncMetricStorage(view_instr_desc, view.GetAggregationType(), - &view.GetAttributesProcessor(), view.GetAggregationConfig())); + auto storage = std::shared_ptr(new AsyncMetricStorage( + view_instr_desc, view.GetAggregationType(), view.GetAggregationConfig())); storage_registry_[instrument_descriptor.name_] = storage; static_cast(storages.get())->AddStorage(storage); return true; diff --git a/sdk/src/metrics/state/temporal_metric_storage.cc b/sdk/src/metrics/state/temporal_metric_storage.cc index f9c281b0c0..50803a80c0 100644 --- a/sdk/src/metrics/state/temporal_metric_storage.cc +++ b/sdk/src/metrics/state/temporal_metric_storage.cc @@ -17,8 +17,11 @@ namespace metrics { TemporalMetricStorage::TemporalMetricStorage(InstrumentDescriptor instrument_descriptor, + AggregationType aggregation_type, const AggregationConfig *aggregation_config) - : instrument_descriptor_(instrument_descriptor), aggregation_config_(aggregation_config) + : instrument_descriptor_(instrument_descriptor), + aggregation_type_(aggregation_type), + aggregation_config_(aggregation_config) {} bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector, @@ -64,9 +67,10 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector, } else { - merged_metrics->Set(attributes, DefaultAggregation::CreateAggregation( - instrument_descriptor_, aggregation_config_) - ->Merge(aggregation)); + merged_metrics->Set(attributes, + DefaultAggregation::CreateAggregation( + aggregation_type_, instrument_descriptor_, aggregation_config_) + ->Merge(aggregation)); } return true; }); @@ -88,21 +92,21 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector, if (aggregation_temporarily == AggregationTemporality::kCumulative) { // merge current delta to previous cumulative - last_aggr_hashmap->GetAllEnteries([&merged_metrics, this](const MetricAttributes &attributes, - Aggregation &aggregation) { - auto agg = merged_metrics->Get(attributes); - if (agg) - { - merged_metrics->Set(attributes, agg->Merge(aggregation)); - } - else - { - auto def_agg = - DefaultAggregation::CreateAggregation(instrument_descriptor_, aggregation_config_); - merged_metrics->Set(attributes, def_agg->Merge(aggregation)); - } - return true; - }); + last_aggr_hashmap->GetAllEnteries( + [&merged_metrics, this](const MetricAttributes &attributes, Aggregation &aggregation) { + auto agg = merged_metrics->Get(attributes); + if (agg) + { + merged_metrics->Set(attributes, agg->Merge(aggregation)); + } + else + { + auto def_agg = DefaultAggregation::CreateAggregation( + aggregation_type_, instrument_descriptor_, aggregation_config_); + merged_metrics->Set(attributes, def_agg->Merge(aggregation)); + } + return true; + }); } last_reported_metrics_[collector] = LastReportedMetrics{std::move(merged_metrics), collection_ts}; diff --git a/sdk/test/metrics/BUILD b/sdk/test/metrics/BUILD index 3904b15ea4..b554dd0a6f 100644 --- a/sdk/test/metrics/BUILD +++ b/sdk/test/metrics/BUILD @@ -252,6 +252,36 @@ cc_test( ], ) +cc_test( + name = "histogram_aggregation_test", + srcs = [ + "histogram_aggregation_test.cc", + ], + tags = [ + "metrics", + "test", + ], + deps = [ + "//sdk/src/metrics", + "@com_google_googletest//:gtest_main", + ], +) + +cc_test( + name = "sum_aggregation_test", + srcs = [ + "sum_aggregation_test.cc", + ], + tags = [ + "metrics", + "test", + ], + deps = [ + "//sdk/src/metrics", + "@com_google_googletest//:gtest_main", + ], +) + otel_cc_benchmark( name = "attributes_processor_benchmark", srcs = [ diff --git a/sdk/test/metrics/CMakeLists.txt b/sdk/test/metrics/CMakeLists.txt index 585023ebbf..97658217d7 100644 --- a/sdk/test/metrics/CMakeLists.txt +++ b/sdk/test/metrics/CMakeLists.txt @@ -4,6 +4,8 @@ foreach( meter_test view_registry_test aggregation_test + sum_aggregation_test + histogram_aggregation_test attributes_processor_test attributes_hashmap_test histogram_test diff --git a/sdk/test/metrics/async_metric_storage_test.cc b/sdk/test/metrics/async_metric_storage_test.cc index b70d1bf52d..f56a403862 100644 --- a/sdk/test/metrics/async_metric_storage_test.cc +++ b/sdk/test/metrics/async_metric_storage_test.cc @@ -65,10 +65,8 @@ TEST_P(WritableMetricStorageTestFixture, TestAggregation) std::vector> collectors; collectors.push_back(collector); - std::unique_ptr default_attributes_processor{ - new DefaultAttributesProcessor{}}; - opentelemetry::sdk::metrics::AsyncMetricStorage storage( - instr_desc, AggregationType::kSum, default_attributes_processor.get(), nullptr); + opentelemetry::sdk::metrics::AsyncMetricStorage storage(instr_desc, AggregationType::kSum, + nullptr); int64_t get_count1 = 20; int64_t put_count1 = 10; std::unordered_map measurements1 = { @@ -157,10 +155,8 @@ TEST_P(WritableMetricStorageTestObservableGaugeFixture, TestAggregation) std::vector> collectors; collectors.push_back(collector); - std::unique_ptr default_attributes_processor{ - new DefaultAttributesProcessor{}}; - opentelemetry::sdk::metrics::AsyncMetricStorage storage( - instr_desc, AggregationType::kLastValue, default_attributes_processor.get(), nullptr); + opentelemetry::sdk::metrics::AsyncMetricStorage storage(instr_desc, AggregationType::kLastValue, + nullptr); int64_t freq_cpu0 = 3; int64_t freq_cpu1 = 5; std::unordered_map measurements1 = { diff --git a/sdk/test/metrics/histogram_aggregation_test.cc b/sdk/test/metrics/histogram_aggregation_test.cc new file mode 100644 index 0000000000..99f6ec3d82 --- /dev/null +++ b/sdk/test/metrics/histogram_aggregation_test.cc @@ -0,0 +1,157 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "opentelemetry/common/macros.h" +#include "opentelemetry/sdk/metrics/data/point_data.h" +#include "opentelemetry/sdk/metrics/meter.h" +#include "opentelemetry/sdk/metrics/meter_context.h" +#include "opentelemetry/sdk/metrics/meter_provider.h" +#include "opentelemetry/sdk/metrics/metric_reader.h" +#include "opentelemetry/sdk/metrics/push_metric_exporter.h" + +#include + +#if OPENTELEMETRY_HAVE_WORKING_REGEX + +using namespace opentelemetry; +using namespace opentelemetry::sdk::instrumentationscope; +using namespace opentelemetry::sdk::metrics; + +class MockMetricExporter : public PushMetricExporter +{ +public: + MockMetricExporter() = default; + opentelemetry::sdk::common::ExportResult Export( + const ResourceMetrics & /* records */) noexcept override + { + return opentelemetry::sdk::common::ExportResult::kSuccess; + } + + AggregationTemporality GetAggregationTemporality( + InstrumentType /* instrument_type */) const noexcept override + { + return AggregationTemporality::kCumulative; + } + + bool ForceFlush(std::chrono::microseconds /* timeout */) noexcept override { return true; } + + bool Shutdown(std::chrono::microseconds /* timeout */) noexcept override { return true; } +}; + +class MockMetricReader : public MetricReader +{ +public: + MockMetricReader(std::unique_ptr exporter) : exporter_(std::move(exporter)) {} + AggregationTemporality GetAggregationTemporality( + InstrumentType instrument_type) const noexcept override + { + return exporter_->GetAggregationTemporality(instrument_type); + } + virtual bool OnForceFlush(std::chrono::microseconds /* timeout */) noexcept override + { + return true; + } + virtual bool OnShutDown(std::chrono::microseconds /* timeout */) noexcept override + { + return true; + } + virtual void OnInitialized() noexcept override {} + +private: + std::unique_ptr exporter_; +}; + +TEST(HistogramInstrumentToHistogramAggregation, Double) +{ + MeterProvider mp; + auto m = mp.GetMeter("meter1", "version1", "schema1"); + + std::unique_ptr exporter(new MockMetricExporter()); + std::shared_ptr reader{new MockMetricReader(std::move(exporter))}; + mp.AddMetricReader(reader); + + auto h = m->CreateDoubleHistogram("histogram1", "histogram1_description", "histogram1_unit"); + + h->Record(5, {}); + h->Record(10, {}); + h->Record(15, {}); + h->Record(20, {}); + h->Record(25, {}); + h->Record(30, {}); + h->Record(35, {}); + h->Record(40, {}); + h->Record(45, {}); + h->Record(50, {}); + h->Record(1e6, {}); + + std::vector actuals; + reader->Collect([&](ResourceMetrics &rm) { + for (const ScopeMetrics &smd : rm.scope_metric_data_) + { + for (const MetricData &md : smd.metric_data_) + { + for (const PointDataAttributes &dp : md.point_data_attr_) + { + actuals.push_back(opentelemetry::nostd::get(dp.point_data)); + } + } + } + return true; + }); + + ASSERT_EQ(1, actuals.size()); + const auto &actual = actuals.at(0); + ASSERT_EQ(1000275.0, opentelemetry::nostd::get(actual.sum_)); + ASSERT_EQ(11, actual.count_); +} + +TEST(CounterToHistogram, Double) +{ + MeterProvider mp; + auto m = mp.GetMeter("meter1", "version1", "schema1"); + + std::unique_ptr exporter(new MockMetricExporter()); + std::shared_ptr reader{new MockMetricReader(std::move(exporter))}; + mp.AddMetricReader(reader); + + std::unique_ptr view{new View("view1", "view1_description", AggregationType::kHistogram)}; + std::unique_ptr instrument_selector{ + new InstrumentSelector(InstrumentType::kCounter, "counter1")}; + std::unique_ptr meter_selector{new MeterSelector("meter1", "version1", "schema1")}; + mp.AddView(std::move(instrument_selector), std::move(meter_selector), std::move(view)); + + auto h = m->CreateDoubleCounter("counter1", "counter1_description", "counter1_unit"); + + h->Add(5, {}); + h->Add(10, {}); + h->Add(15, {}); + h->Add(20, {}); + h->Add(25, {}); + h->Add(30, {}); + h->Add(35, {}); + h->Add(40, {}); + h->Add(45, {}); + h->Add(50, {}); + h->Add(1e6, {}); + + std::vector actuals; + reader->Collect([&](ResourceMetrics &rm) { + for (const ScopeMetrics &smd : rm.scope_metric_data_) + { + for (const MetricData &md : smd.metric_data_) + { + for (const PointDataAttributes &dp : md.point_data_attr_) + { + actuals.push_back(opentelemetry::nostd::get(dp.point_data)); + } + } + } + return true; + }); + + ASSERT_EQ(1, actuals.size()); + const auto &actual = actuals.at(0); + ASSERT_EQ(1000275.0, opentelemetry::nostd::get(actual.sum_)); + ASSERT_EQ(11, actual.count_); +} +#endif diff --git a/sdk/test/metrics/sum_aggregation_test.cc b/sdk/test/metrics/sum_aggregation_test.cc new file mode 100644 index 0000000000..47022c866f --- /dev/null +++ b/sdk/test/metrics/sum_aggregation_test.cc @@ -0,0 +1,205 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "opentelemetry/common/macros.h" +#include "opentelemetry/sdk/metrics/data/point_data.h" +#include "opentelemetry/sdk/metrics/meter.h" +#include "opentelemetry/sdk/metrics/meter_context.h" +#include "opentelemetry/sdk/metrics/meter_provider.h" +#include "opentelemetry/sdk/metrics/metric_reader.h" +#include "opentelemetry/sdk/metrics/push_metric_exporter.h" + +#include + +#if OPENTELEMETRY_HAVE_WORKING_REGEX + +using namespace opentelemetry; +using namespace opentelemetry::sdk::instrumentationscope; +using namespace opentelemetry::sdk::metrics; + +class MockMetricExporter : public PushMetricExporter +{ +public: + MockMetricExporter() = default; + opentelemetry::sdk::common::ExportResult Export( + const ResourceMetrics & /* records */) noexcept override + { + return opentelemetry::sdk::common::ExportResult::kSuccess; + } + + AggregationTemporality GetAggregationTemporality( + InstrumentType /* instrument_type */) const noexcept override + { + return AggregationTemporality::kCumulative; + } + + bool ForceFlush(std::chrono::microseconds /* timeout */) noexcept override { return true; } + + bool Shutdown(std::chrono::microseconds /* timeout */) noexcept override { return true; } +}; + +class MockMetricReader : public MetricReader +{ +public: + MockMetricReader(std::unique_ptr exporter) : exporter_(std::move(exporter)) {} + AggregationTemporality GetAggregationTemporality( + InstrumentType instrument_type) const noexcept override + { + return exporter_->GetAggregationTemporality(instrument_type); + } + virtual bool OnForceFlush(std::chrono::microseconds /* timeout */) noexcept override + { + return true; + } + virtual bool OnShutDown(std::chrono::microseconds /* timeout */) noexcept override + { + return true; + } + virtual void OnInitialized() noexcept override {} + +private: + std::unique_ptr exporter_; +}; + +TEST(HistogramToSum, Double) +{ + MeterProvider mp; + auto m = mp.GetMeter("meter1", "version1", "schema1"); + + std::unique_ptr exporter(new MockMetricExporter()); + std::shared_ptr reader{new MockMetricReader(std::move(exporter))}; + mp.AddMetricReader(reader); + + std::unique_ptr view{new View("view1", "view1_description", AggregationType::kSum)}; + std::unique_ptr instrument_selector{ + new InstrumentSelector(InstrumentType::kHistogram, "histogram1")}; + std::unique_ptr meter_selector{new MeterSelector("meter1", "version1", "schema1")}; + mp.AddView(std::move(instrument_selector), std::move(meter_selector), std::move(view)); + + auto h = m->CreateDoubleHistogram("histogram1", "histogram1_description", "histogram1_unit"); + + h->Record(5, {}); + h->Record(10, {}); + h->Record(15, {}); + h->Record(20, {}); + h->Record(25, {}); + h->Record(30, {}); + h->Record(35, {}); + h->Record(40, {}); + h->Record(45, {}); + h->Record(50, {}); + h->Record(1e6, {}); + + std::vector actuals; + reader->Collect([&](ResourceMetrics &rm) { + for (const ScopeMetrics &smd : rm.scope_metric_data_) + { + for (const MetricData &md : smd.metric_data_) + { + for (const PointDataAttributes &dp : md.point_data_attr_) + { + actuals.push_back(opentelemetry::nostd::get(dp.point_data)); + } + } + } + return true; + }); + + ASSERT_EQ(1, actuals.size()); + const auto &actual = actuals.at(0); + ASSERT_EQ(1000275.0, opentelemetry::nostd::get(actual.value_)); +} + +TEST(CounterToSum, Double) +{ + MeterProvider mp; + auto m = mp.GetMeter("meter1", "version1", "schema1"); + + std::unique_ptr exporter(new MockMetricExporter()); + std::shared_ptr reader{new MockMetricReader(std::move(exporter))}; + mp.AddMetricReader(reader); + + std::unique_ptr view{new View("view1", "view1_description", AggregationType::kSum)}; + std::unique_ptr instrument_selector{ + new InstrumentSelector(InstrumentType::kCounter, "counter1")}; + std::unique_ptr meter_selector{new MeterSelector("meter1", "version1", "schema1")}; + mp.AddView(std::move(instrument_selector), std::move(meter_selector), std::move(view)); + + auto h = m->CreateDoubleCounter("counter1", "counter1_description", "counter1_unit"); + + h->Add(5, {}); + h->Add(10, {}); + h->Add(15, {}); + h->Add(20, {}); + h->Add(25, {}); + h->Add(30, {}); + h->Add(35, {}); + h->Add(40, {}); + h->Add(45, {}); + h->Add(50, {}); + h->Add(1e6, {}); + + std::vector actuals; + reader->Collect([&](ResourceMetrics &rm) { + for (const ScopeMetrics &smd : rm.scope_metric_data_) + { + for (const MetricData &md : smd.metric_data_) + { + for (const PointDataAttributes &dp : md.point_data_attr_) + { + actuals.push_back(opentelemetry::nostd::get(dp.point_data)); + } + } + } + return true; + }); + + ASSERT_EQ(1, actuals.size()); + const auto &actual = actuals.at(0); + ASSERT_EQ(1000275.0, opentelemetry::nostd::get(actual.value_)); +} + +TEST(UpDownCounterToSum, Double) +{ + MeterProvider mp; + auto m = mp.GetMeter("meter1", "version1", "schema1"); + + std::unique_ptr exporter(new MockMetricExporter()); + std::shared_ptr reader{new MockMetricReader(std::move(exporter))}; + mp.AddMetricReader(reader); + + std::unique_ptr view{new View("view1", "view1_description", AggregationType::kSum)}; + std::unique_ptr instrument_selector{ + new InstrumentSelector(InstrumentType::kUpDownCounter, "counter1")}; + std::unique_ptr meter_selector{new MeterSelector("meter1", "version1", "schema1")}; + mp.AddView(std::move(instrument_selector), std::move(meter_selector), std::move(view)); + + auto h = m->CreateDoubleUpDownCounter("counter1", "counter1_description", "counter1_unit"); + + h->Add(5, {}); + h->Add(10, {}); + h->Add(-15, {}); + h->Add(20, {}); + h->Add(25, {}); + h->Add(-30, {}); + + std::vector actuals; + reader->Collect([&](ResourceMetrics &rm) { + for (const ScopeMetrics &smd : rm.scope_metric_data_) + { + for (const MetricData &md : smd.metric_data_) + { + for (const PointDataAttributes &dp : md.point_data_attr_) + { + actuals.push_back(opentelemetry::nostd::get(dp.point_data)); + } + } + } + return true; + }); + + ASSERT_EQ(1, actuals.size()); + const auto &actual = actuals.at(0); + ASSERT_EQ(15.0, opentelemetry::nostd::get(actual.value_)); +} +#endif