From d81f9c95697719b0ef7de31f930124a1ac63f20d Mon Sep 17 00:00:00 2001 From: Lalit Date: Fri, 30 Sep 2022 10:47:37 -0700 Subject: [PATCH 1/4] fix --- .../common/metrics_foo_library/foo_library.cc | 8 ++- examples/metrics_simple/metrics_ostream.cc | 2 +- .../sdk/metrics/state/async_metric_storage.h | 6 +- sdk/src/metrics/meter.cc | 1 - .../metrics/state/temporal_metric_storage.cc | 1 + sdk/test/metrics/async_metric_storage_test.cc | 58 +++++++++++++++---- 6 files changed, 60 insertions(+), 16 deletions(-) diff --git a/examples/common/metrics_foo_library/foo_library.cc b/examples/common/metrics_foo_library/foo_library.cc index ec4fec736d..109660b22c 100644 --- a/examples/common/metrics_foo_library/foo_library.cc +++ b/examples/common/metrics_foo_library/foo_library.cc @@ -12,6 +12,8 @@ # include "opentelemetry/metrics/provider.h" # include "opentelemetry/nostd/shared_ptr.h" +# include + namespace nostd = opentelemetry::nostd; namespace metrics_api = opentelemetry::metrics; @@ -29,6 +31,8 @@ std::map get_random_attr() labels[rand() % (labels.size() - 1)]}; } +static double val = 1.0; + class MeasurementFetcher { public: @@ -39,10 +43,12 @@ class MeasurementFetcher if (nostd::holds_alternative< nostd::shared_ptr>>(observer_result)) { - double val = (rand() % 700) + 1.1; + // double val = (rand() % 700) + 1.1; + // val = 1.0; nostd::get>>( observer_result) ->Observe(val /*, labelkv */); + val += 2.0; } } }; diff --git a/examples/metrics_simple/metrics_ostream.cc b/examples/metrics_simple/metrics_ostream.cc index 669362830b..9ee1dc1c25 100644 --- a/examples/metrics_simple/metrics_ostream.cc +++ b/examples/metrics_simple/metrics_ostream.cc @@ -62,7 +62,7 @@ void initMetrics(const std::string &name) std::unique_ptr observable_meter_selector{ new metric_sdk::MeterSelector(name, version, schema)}; std::unique_ptr observable_sum_view{ - new metric_sdk::View{name, "description", metric_sdk::AggregationType::kSum}}; + new metric_sdk::View{name, "test_description", metric_sdk::AggregationType::kSum}}; p->AddView(std::move(observable_instrument_selector), std::move(observable_meter_selector), std::move(observable_sum_view)); diff --git a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h index 79731a80bc..0b1560a0da 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h @@ -53,9 +53,9 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora if (prev) { auto delta = prev->Diff(*aggr); - cumulative_hash_map_->Set(measurement.first, - DefaultAggregation::CloneAggregation( - aggregation_type_, instrument_descriptor_, *delta)); + cumulative_hash_map_->Set(measurement.first, std::move(aggr)); + /* DefaultAggregation::CloneAggregation( + aggregation_type_, instrument_descriptor_, *delta));*/ delta_hash_map_->Set(measurement.first, std::move(delta)); } else diff --git a/sdk/src/metrics/meter.cc b/sdk/src/metrics/meter.cc index 62f6eaf313..4d6595dd7f 100644 --- a/sdk/src/metrics/meter.cc +++ b/sdk/src/metrics/meter.cc @@ -293,7 +293,6 @@ std::unique_ptr Meter::RegisterAsyncMetricStorage( std::vector Meter::Collect(CollectorHandle *collector, opentelemetry::common::SystemTimestamp collect_ts) noexcept { - observable_registry_->Observe(collect_ts); std::vector metric_data_list; auto ctx = meter_context_.lock(); diff --git a/sdk/src/metrics/state/temporal_metric_storage.cc b/sdk/src/metrics/state/temporal_metric_storage.cc index 3c8f80695e..b8fd38d794 100644 --- a/sdk/src/metrics/state/temporal_metric_storage.cc +++ b/sdk/src/metrics/state/temporal_metric_storage.cc @@ -34,6 +34,7 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector, opentelemetry::common::SystemTimestamp last_collection_ts = sdk_start_ts; AggregationTemporality aggregation_temporarily = collector->GetAggregationTemporality(instrument_descriptor_.type_); + if (delta_metrics->Size()) { for (auto &col : collectors) diff --git a/sdk/test/metrics/async_metric_storage_test.cc b/sdk/test/metrics/async_metric_storage_test.cc index 02d2734ecf..740d1944bb 100644 --- a/sdk/test/metrics/async_metric_storage_test.cc +++ b/sdk/test/metrics/async_metric_storage_test.cc @@ -107,12 +107,12 @@ TEST_P(WritableMetricStorageTestFixture, TestAggregation) opentelemetry::sdk::metrics::AsyncMetricStorage storage( instr_desc, AggregationType::kSum, default_attributes_processor.get(), std::shared_ptr{}); - long get_count = 20l; - long put_count = 10l; - size_t attribute_count = 2; - std::unordered_map measurements = { - {{{"RequestType", "GET"}}, get_count}, {{{"RequestType", "PUT"}}, put_count}}; - storage.RecordLong(measurements, + long get_count1 = 20l; + long put_count1 = 10l; + size_t attribute_count = 2; + std::unordered_map measurements1 = { + {{{"RequestType", "GET"}}, get_count1}, {{{"RequestType", "PUT"}}, put_count1}}; + storage.RecordLong(measurements1, opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now())); storage.Collect(collector.get(), collectors, sdk_start_ts, collection_ts, @@ -123,20 +123,58 @@ TEST_P(WritableMetricStorageTestFixture, TestAggregation) if (opentelemetry::nostd::get( data_attr.attributes.find("RequestType")->second) == "GET") { - EXPECT_EQ(opentelemetry::nostd::get(data.value_), get_count); + EXPECT_EQ(opentelemetry::nostd::get(data.value_), get_count1); } else if (opentelemetry::nostd::get( data_attr.attributes.find("RequestType")->second) == "PUT") { - EXPECT_EQ(opentelemetry::nostd::get(data.value_), put_count); + EXPECT_EQ(opentelemetry::nostd::get(data.value_), put_count1); } } return true; }); // subsequent recording after collection shouldn't fail - storage.RecordLong(measurements, + // monotonic increasing values; + long get_count2 = 50l; + long put_count2 = 70l; + + std::unordered_map measurements2 = { + {{{"RequestType", "GET"}}, get_count2}, {{{"RequestType", "PUT"}}, put_count2}}; + storage.RecordLong(measurements2, opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now())); - EXPECT_EQ(MeasurementFetcher::number_of_attributes, attribute_count); + storage.Collect( + collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData data) { + for (auto data_attr : data.point_data_attr_) + { + auto data = opentelemetry::nostd::get(data_attr.point_data); + if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "GET") + { + if (temporality == AggregationTemporality::kCumulative) + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), get_count2); + } + else + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), get_count2 - get_count1); + } + } + else if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "PUT") + { + if (temporality == AggregationTemporality::kCumulative) + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), put_count2); + } + else + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), + put_count2 - put_count1); // 50 - 30 + } + } + } + return true; + }); } INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestLong, From 686a31cb4d943a4f2f8bf9e596cf4fa845a94987 Mon Sep 17 00:00:00 2001 From: Lalit Date: Fri, 30 Sep 2022 11:05:39 -0700 Subject: [PATCH 2/4] example --- examples/common/metrics_foo_library/foo_library.cc | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/examples/common/metrics_foo_library/foo_library.cc b/examples/common/metrics_foo_library/foo_library.cc index 109660b22c..5fd32524d2 100644 --- a/examples/common/metrics_foo_library/foo_library.cc +++ b/examples/common/metrics_foo_library/foo_library.cc @@ -12,8 +12,6 @@ # include "opentelemetry/metrics/provider.h" # include "opentelemetry/nostd/shared_ptr.h" -# include - namespace nostd = opentelemetry::nostd; namespace metrics_api = opentelemetry::metrics; @@ -31,8 +29,6 @@ std::map get_random_attr() labels[rand() % (labels.size() - 1)]}; } -static double val = 1.0; - class MeasurementFetcher { public: @@ -43,15 +39,16 @@ class MeasurementFetcher if (nostd::holds_alternative< nostd::shared_ptr>>(observer_result)) { - // double val = (rand() % 700) + 1.1; - // val = 1.0; + double random_incr = (rand() % 5) + 1.1; + value_ += random_incr; nostd::get>>( observer_result) - ->Observe(val /*, labelkv */); - val += 2.0; + ->Observe(value_ /*, labelkv */); } } + static double value_; }; +double MeasurementFetcher::value_ = 0.0; } // namespace void foo_library::counter_example(const std::string &name) From fd738047972bf56860a97e4d6da87eb8bc669de7 Mon Sep 17 00:00:00 2001 From: Lalit Date: Fri, 30 Sep 2022 11:10:40 -0700 Subject: [PATCH 3/4] add comments --- .../sdk/metrics/state/async_metric_storage.h | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h index 0b1560a0da..db9c61741d 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h @@ -43,7 +43,9 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora void Record(const std::unordered_map &measurements, opentelemetry::common::SystemTimestamp /* observation_time */) noexcept { - // process the read measurements - aggregate and store in hashmap + // Async counter always record monotonically increasing values, and the + // exporter/reader can request either for delta or cumulative value. + // So we convert the async counter value to delta before passing it to temporal storage. std::lock_guard guard(hashmap_lock_); for (auto &measurement : measurements) { @@ -53,13 +55,14 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora if (prev) { auto delta = prev->Diff(*aggr); + // store received value in cumulative map, and the diff in delta map (to pass it to temporal + // storage) cumulative_hash_map_->Set(measurement.first, std::move(aggr)); - /* DefaultAggregation::CloneAggregation( - aggregation_type_, instrument_descriptor_, *delta));*/ delta_hash_map_->Set(measurement.first, std::move(delta)); } else { + // store received value in cumulative and delta map. cumulative_hash_map_->Set( measurement.first, DefaultAggregation::CloneAggregation(aggregation_type_, instrument_descriptor_, *aggr)); From bc3107a5490f780c2df1fcde254a9f893927a7c9 Mon Sep 17 00:00:00 2001 From: Lalit Date: Sun, 2 Oct 2022 12:01:53 -0700 Subject: [PATCH 4/4] redundant comment --- sdk/test/metrics/async_metric_storage_test.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/test/metrics/async_metric_storage_test.cc b/sdk/test/metrics/async_metric_storage_test.cc index 740d1944bb..681547f171 100644 --- a/sdk/test/metrics/async_metric_storage_test.cc +++ b/sdk/test/metrics/async_metric_storage_test.cc @@ -168,8 +168,7 @@ TEST_P(WritableMetricStorageTestFixture, TestAggregation) } else { - EXPECT_EQ(opentelemetry::nostd::get(data.value_), - put_count2 - put_count1); // 50 - 30 + EXPECT_EQ(opentelemetry::nostd::get(data.value_), put_count2 - put_count1); } } }