Skip to content

Commit

Permalink
Fix 1585 - Multiple cumulative metric collections without measurement…
Browse files Browse the repository at this point in the history
… recording. (open-telemetry#1586)
  • Loading branch information
lalitb authored and yxue committed Dec 5, 2022
1 parent 25e1f4f commit 4cac164
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 18 deletions.
37 changes: 20 additions & 17 deletions sdk/src/metrics/state/temporal_metric_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector,
opentelemetry::common::SystemTimestamp last_collection_ts = sdk_start_ts;
AggregationTemporality aggregation_temporarily =
collector->GetAggregationTemporality(instrument_descriptor_.type_);
for (auto &col : collectors)
if (delta_metrics->Size())
{
unreported_metrics_[col.get()].push_back(delta_metrics);
for (auto &col : collectors)
{
unreported_metrics_[col.get()].push_back(delta_metrics);
}
}

// Get the unreported metrics for the `collector` from `unreported metrics stash`
Expand Down Expand Up @@ -88,20 +91,20 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector,
if (aggregation_temporarily == AggregationTemporality::kCumulative)
{
// merge current delta to previous cumulative
last_aggr_hashmap->GetAllEnteries([&merged_metrics, this](const MetricAttributes &attributes,
Aggregation &aggregation) {
auto agg = merged_metrics->Get(attributes);
if (agg)
{
merged_metrics->Set(attributes, agg->Merge(aggregation));
}
else
{
merged_metrics->Set(
attributes, DefaultAggregation::CreateAggregation(instrument_descriptor_, nullptr));
}
return true;
});
last_aggr_hashmap->GetAllEnteries(
[&merged_metrics, this](const MetricAttributes &attributes, Aggregation &aggregation) {
auto agg = merged_metrics->Get(attributes);
if (agg)
{
merged_metrics->Set(attributes, agg->Merge(aggregation));
}
else
{
auto def_agg = DefaultAggregation::CreateAggregation(instrument_descriptor_, nullptr);
merged_metrics->Set(attributes, def_agg->Merge(aggregation));
}
return true;
});
}
last_reported_metrics_[collector] =
LastReportedMetrics{std::move(merged_metrics), collection_ts};
Expand Down Expand Up @@ -137,4 +140,4 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector,

} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
#endif
#endif
62 changes: 61 additions & 1 deletion sdk/test/metrics/sync_metric_storage_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ TEST_P(WritableMetricStorageTestFixture, LongSumAggregation)
}
return true;
});

EXPECT_EQ(count_attributes, 2); // GET and PUT
// In case of delta temporarily, subsequent collection would contain new data points, so resetting
// the counts
if (temporality == AggregationTemporality::kDelta)
Expand All @@ -105,6 +105,34 @@ TEST_P(WritableMetricStorageTestFixture, LongSumAggregation)
expected_total_put_requests = 0;
}

// collect one more time.
collection_ts = std::chrono::system_clock::now();
count_attributes = 0;
storage.Collect(
collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData data) {
for (auto data_attr : data.point_data_attr_)
{
auto data = opentelemetry::nostd::get<SumPointData>(data_attr.point_data);
if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("RequestType")->second) == "GET")
{
count_attributes++;
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), expected_total_get_requests);
}
else if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("RequestType")->second) == "PUT")
{
count_attributes++;
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), expected_total_put_requests);
}
}
return true;
});
if (temporality == AggregationTemporality::kCumulative)
{
EXPECT_EQ(count_attributes, 2); // GET AND PUT
}

storage.RecordLong(50l, KeyValueIterableView<std::map<std::string, std::string>>(attributes_get),
opentelemetry::context::Context{});
expected_total_get_requests += 50;
Expand Down Expand Up @@ -134,7 +162,9 @@ TEST_P(WritableMetricStorageTestFixture, LongSumAggregation)
}
return true;
});
EXPECT_EQ(count_attributes, 2); // GET and PUT
}

INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestLong,
WritableMetricStorageTestFixture,
::testing::Values(AggregationTemporality::kCumulative,
Expand Down Expand Up @@ -205,6 +235,7 @@ TEST_P(WritableMetricStorageTestFixture, DoubleSumAggregation)
}
return true;
});
EXPECT_EQ(count_attributes, 2); // GET and PUT

// In case of delta temporarily, subsequent collection would contain new data points, so resetting
// the counts
Expand All @@ -214,6 +245,34 @@ TEST_P(WritableMetricStorageTestFixture, DoubleSumAggregation)
expected_total_put_requests = 0;
}

// collect one more time.
collection_ts = std::chrono::system_clock::now();
count_attributes = 0;
storage.Collect(
collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData data) {
for (auto data_attr : data.point_data_attr_)
{
auto data = opentelemetry::nostd::get<SumPointData>(data_attr.point_data);
if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("RequestType")->second) == "GET")
{
count_attributes++;
EXPECT_EQ(opentelemetry::nostd::get<double>(data.value_), expected_total_get_requests);
}
else if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("RequestType")->second) == "PUT")
{
count_attributes++;
EXPECT_EQ(opentelemetry::nostd::get<double>(data.value_), expected_total_put_requests);
}
}
return true;
});
if (temporality == AggregationTemporality::kCumulative)
{
EXPECT_EQ(count_attributes, 2); // GET AND PUT
}

storage.RecordDouble(50.0,
KeyValueIterableView<std::map<std::string, std::string>>(attributes_get),
opentelemetry::context::Context{});
Expand Down Expand Up @@ -245,6 +304,7 @@ TEST_P(WritableMetricStorageTestFixture, DoubleSumAggregation)
}
return true;
});
EXPECT_EQ(count_attributes, 2); // GET and PUT
}
INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestDouble,
WritableMetricStorageTestFixture,
Expand Down

0 comments on commit 4cac164

Please sign in to comment.