Skip to content

Commit

Permalink
Custom Aggregation support (#1899)
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb authored Jan 23, 2023
1 parent e8fce84 commit 998badd
Show file tree
Hide file tree
Showing 11 changed files with 433 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ class DefaultAggregation
case AggregationType::kSum: {
bool is_monotonic = true;
if (instrument_descriptor.type_ == InstrumentType::kUpDownCounter ||
instrument_descriptor.type_ == InstrumentType::kObservableUpDownCounter)
instrument_descriptor.type_ == InstrumentType::kObservableUpDownCounter ||
instrument_descriptor.type_ == InstrumentType::kHistogram)
{
is_monotonic = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,12 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
public:
AsyncMetricStorage(InstrumentDescriptor instrument_descriptor,
const AggregationType aggregation_type,
const AttributesProcessor *attributes_processor,
const AggregationConfig *aggregation_config,
void *state = nullptr)
const AggregationConfig *aggregation_config)
: instrument_descriptor_(instrument_descriptor),
aggregation_type_{aggregation_type},
attributes_processor_{attributes_processor},
state_{state},
cumulative_hash_map_(new AttributesHashMap()),
delta_hash_map_(new AttributesHashMap()),
temporal_metric_storage_(instrument_descriptor, aggregation_config)
temporal_metric_storage_(instrument_descriptor, aggregation_type, aggregation_config)
{}

template <class T>
Expand Down Expand Up @@ -116,8 +112,6 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
private:
InstrumentDescriptor instrument_descriptor_;
AggregationType aggregation_type_;
const AttributesProcessor *attributes_processor_;
void *state_;
std::unique_ptr<AttributesHashMap> cumulative_hash_map_;
std::unique_ptr<AttributesHashMap> delta_hash_map_;
opentelemetry::common::SpinLockMutex hashmap_lock_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,17 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
nostd::shared_ptr<ExemplarReservoir> &&exemplar_reservoir,
const AggregationConfig *aggregation_config)
: instrument_descriptor_(instrument_descriptor),
aggregation_type_{aggregation_type},
attributes_hashmap_(new AttributesHashMap()),
attributes_processor_{attributes_processor},
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
exemplar_reservoir_(exemplar_reservoir),
#endif
temporal_metric_storage_(instrument_descriptor, aggregation_config)
temporal_metric_storage_(instrument_descriptor, aggregation_type, aggregation_config)

{
create_default_aggregation_ = [&, aggregation_config]() -> std::unique_ptr<Aggregation> {
return DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_,
create_default_aggregation_ = [&, aggregation_type,
aggregation_config]() -> std::unique_ptr<Aggregation> {
return DefaultAggregation::CreateAggregation(aggregation_type, instrument_descriptor_,
aggregation_config);
};
}
Expand Down Expand Up @@ -120,7 +120,6 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage

private:
InstrumentDescriptor instrument_descriptor_;
AggregationType aggregation_type_;

// hashmap to maintain the metrics for delta collection (i.e, collection since last Collect call)
std::unique_ptr<AttributesHashMap> attributes_hashmap_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class TemporalMetricStorage
{
public:
TemporalMetricStorage(InstrumentDescriptor instrument_descriptor,
AggregationType aggregation_type,
const AggregationConfig *aggregation_config);

bool buildMetrics(CollectorHandle *collector,
Expand All @@ -38,6 +39,7 @@ class TemporalMetricStorage

private:
InstrumentDescriptor instrument_descriptor_;
AggregationType aggregation_type_;

// unreported metrics stash for all the collectors
std::unordered_map<CollectorHandle *, std::list<std::shared_ptr<AttributesHashMap>>>
Expand Down
5 changes: 2 additions & 3 deletions sdk/src/metrics/meter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,8 @@ std::unique_ptr<AsyncWritableMetricStorage> Meter::RegisterAsyncMetricStorage(
{
view_instr_desc.description_ = view.GetDescription();
}
auto storage = std::shared_ptr<AsyncMetricStorage>(
new AsyncMetricStorage(view_instr_desc, view.GetAggregationType(),
&view.GetAttributesProcessor(), view.GetAggregationConfig()));
auto storage = std::shared_ptr<AsyncMetricStorage>(new AsyncMetricStorage(
view_instr_desc, view.GetAggregationType(), view.GetAggregationConfig()));
storage_registry_[instrument_descriptor.name_] = storage;
static_cast<AsyncMultiMetricStorage *>(storages.get())->AddStorage(storage);
return true;
Expand Down
42 changes: 23 additions & 19 deletions sdk/src/metrics/state/temporal_metric_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ namespace metrics
{

TemporalMetricStorage::TemporalMetricStorage(InstrumentDescriptor instrument_descriptor,
AggregationType aggregation_type,
const AggregationConfig *aggregation_config)
: instrument_descriptor_(instrument_descriptor), aggregation_config_(aggregation_config)
: instrument_descriptor_(instrument_descriptor),
aggregation_type_(aggregation_type),
aggregation_config_(aggregation_config)
{}

bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector,
Expand Down Expand Up @@ -64,9 +67,10 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector,
}
else
{
merged_metrics->Set(attributes, DefaultAggregation::CreateAggregation(
instrument_descriptor_, aggregation_config_)
->Merge(aggregation));
merged_metrics->Set(attributes,
DefaultAggregation::CreateAggregation(
aggregation_type_, instrument_descriptor_, aggregation_config_)
->Merge(aggregation));
}
return true;
});
Expand All @@ -88,21 +92,21 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector,
if (aggregation_temporarily == AggregationTemporality::kCumulative)
{
// merge current delta to previous cumulative
last_aggr_hashmap->GetAllEnteries([&merged_metrics, this](const MetricAttributes &attributes,
Aggregation &aggregation) {
auto agg = merged_metrics->Get(attributes);
if (agg)
{
merged_metrics->Set(attributes, agg->Merge(aggregation));
}
else
{
auto def_agg =
DefaultAggregation::CreateAggregation(instrument_descriptor_, aggregation_config_);
merged_metrics->Set(attributes, def_agg->Merge(aggregation));
}
return true;
});
last_aggr_hashmap->GetAllEnteries(
[&merged_metrics, this](const MetricAttributes &attributes, Aggregation &aggregation) {
auto agg = merged_metrics->Get(attributes);
if (agg)
{
merged_metrics->Set(attributes, agg->Merge(aggregation));
}
else
{
auto def_agg = DefaultAggregation::CreateAggregation(
aggregation_type_, instrument_descriptor_, aggregation_config_);
merged_metrics->Set(attributes, def_agg->Merge(aggregation));
}
return true;
});
}
last_reported_metrics_[collector] =
LastReportedMetrics{std::move(merged_metrics), collection_ts};
Expand Down
30 changes: 30 additions & 0 deletions sdk/test/metrics/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,36 @@ cc_test(
],
)

cc_test(
name = "histogram_aggregation_test",
srcs = [
"histogram_aggregation_test.cc",
],
tags = [
"metrics",
"test",
],
deps = [
"//sdk/src/metrics",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "sum_aggregation_test",
srcs = [
"sum_aggregation_test.cc",
],
tags = [
"metrics",
"test",
],
deps = [
"//sdk/src/metrics",
"@com_google_googletest//:gtest_main",
],
)

otel_cc_benchmark(
name = "attributes_processor_benchmark",
srcs = [
Expand Down
2 changes: 2 additions & 0 deletions sdk/test/metrics/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ foreach(
meter_test
view_registry_test
aggregation_test
sum_aggregation_test
histogram_aggregation_test
attributes_processor_test
attributes_hashmap_test
histogram_test
Expand Down
12 changes: 4 additions & 8 deletions sdk/test/metrics/async_metric_storage_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,8 @@ TEST_P(WritableMetricStorageTestFixture, TestAggregation)
std::vector<std::shared_ptr<CollectorHandle>> collectors;
collectors.push_back(collector);

std::unique_ptr<AttributesProcessor> default_attributes_processor{
new DefaultAttributesProcessor{}};
opentelemetry::sdk::metrics::AsyncMetricStorage storage(
instr_desc, AggregationType::kSum, default_attributes_processor.get(), nullptr);
opentelemetry::sdk::metrics::AsyncMetricStorage storage(instr_desc, AggregationType::kSum,
nullptr);
int64_t get_count1 = 20;
int64_t put_count1 = 10;
std::unordered_map<MetricAttributes, int64_t, AttributeHashGenerator> measurements1 = {
Expand Down Expand Up @@ -157,10 +155,8 @@ TEST_P(WritableMetricStorageTestObservableGaugeFixture, TestAggregation)
std::vector<std::shared_ptr<CollectorHandle>> collectors;
collectors.push_back(collector);

std::unique_ptr<AttributesProcessor> default_attributes_processor{
new DefaultAttributesProcessor{}};
opentelemetry::sdk::metrics::AsyncMetricStorage storage(
instr_desc, AggregationType::kLastValue, default_attributes_processor.get(), nullptr);
opentelemetry::sdk::metrics::AsyncMetricStorage storage(instr_desc, AggregationType::kLastValue,
nullptr);
int64_t freq_cpu0 = 3;
int64_t freq_cpu1 = 5;
std::unordered_map<MetricAttributes, int64_t, AttributeHashGenerator> measurements1 = {
Expand Down
157 changes: 157 additions & 0 deletions sdk/test/metrics/histogram_aggregation_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#include "opentelemetry/common/macros.h"
#include "opentelemetry/sdk/metrics/data/point_data.h"
#include "opentelemetry/sdk/metrics/meter.h"
#include "opentelemetry/sdk/metrics/meter_context.h"
#include "opentelemetry/sdk/metrics/meter_provider.h"
#include "opentelemetry/sdk/metrics/metric_reader.h"
#include "opentelemetry/sdk/metrics/push_metric_exporter.h"

#include <gtest/gtest.h>

#if OPENTELEMETRY_HAVE_WORKING_REGEX

using namespace opentelemetry;
using namespace opentelemetry::sdk::instrumentationscope;
using namespace opentelemetry::sdk::metrics;

class MockMetricExporter : public PushMetricExporter
{
public:
MockMetricExporter() = default;
opentelemetry::sdk::common::ExportResult Export(
const ResourceMetrics & /* records */) noexcept override
{
return opentelemetry::sdk::common::ExportResult::kSuccess;
}

AggregationTemporality GetAggregationTemporality(
InstrumentType /* instrument_type */) const noexcept override
{
return AggregationTemporality::kCumulative;
}

bool ForceFlush(std::chrono::microseconds /* timeout */) noexcept override { return true; }

bool Shutdown(std::chrono::microseconds /* timeout */) noexcept override { return true; }
};

class MockMetricReader : public MetricReader
{
public:
MockMetricReader(std::unique_ptr<PushMetricExporter> exporter) : exporter_(std::move(exporter)) {}
AggregationTemporality GetAggregationTemporality(
InstrumentType instrument_type) const noexcept override
{
return exporter_->GetAggregationTemporality(instrument_type);
}
virtual bool OnForceFlush(std::chrono::microseconds /* timeout */) noexcept override
{
return true;
}
virtual bool OnShutDown(std::chrono::microseconds /* timeout */) noexcept override
{
return true;
}
virtual void OnInitialized() noexcept override {}

private:
std::unique_ptr<PushMetricExporter> exporter_;
};

TEST(HistogramInstrumentToHistogramAggregation, Double)
{
MeterProvider mp;
auto m = mp.GetMeter("meter1", "version1", "schema1");

std::unique_ptr<MockMetricExporter> exporter(new MockMetricExporter());
std::shared_ptr<MetricReader> reader{new MockMetricReader(std::move(exporter))};
mp.AddMetricReader(reader);

auto h = m->CreateDoubleHistogram("histogram1", "histogram1_description", "histogram1_unit");

h->Record(5, {});
h->Record(10, {});
h->Record(15, {});
h->Record(20, {});
h->Record(25, {});
h->Record(30, {});
h->Record(35, {});
h->Record(40, {});
h->Record(45, {});
h->Record(50, {});
h->Record(1e6, {});

std::vector<HistogramPointData> actuals;
reader->Collect([&](ResourceMetrics &rm) {
for (const ScopeMetrics &smd : rm.scope_metric_data_)
{
for (const MetricData &md : smd.metric_data_)
{
for (const PointDataAttributes &dp : md.point_data_attr_)
{
actuals.push_back(opentelemetry::nostd::get<HistogramPointData>(dp.point_data));
}
}
}
return true;
});

ASSERT_EQ(1, actuals.size());
const auto &actual = actuals.at(0);
ASSERT_EQ(1000275.0, opentelemetry::nostd::get<double>(actual.sum_));
ASSERT_EQ(11, actual.count_);
}

TEST(CounterToHistogram, Double)
{
MeterProvider mp;
auto m = mp.GetMeter("meter1", "version1", "schema1");

std::unique_ptr<MockMetricExporter> exporter(new MockMetricExporter());
std::shared_ptr<MetricReader> reader{new MockMetricReader(std::move(exporter))};
mp.AddMetricReader(reader);

std::unique_ptr<View> view{new View("view1", "view1_description", AggregationType::kHistogram)};
std::unique_ptr<InstrumentSelector> instrument_selector{
new InstrumentSelector(InstrumentType::kCounter, "counter1")};
std::unique_ptr<MeterSelector> meter_selector{new MeterSelector("meter1", "version1", "schema1")};
mp.AddView(std::move(instrument_selector), std::move(meter_selector), std::move(view));

auto h = m->CreateDoubleCounter("counter1", "counter1_description", "counter1_unit");

h->Add(5, {});
h->Add(10, {});
h->Add(15, {});
h->Add(20, {});
h->Add(25, {});
h->Add(30, {});
h->Add(35, {});
h->Add(40, {});
h->Add(45, {});
h->Add(50, {});
h->Add(1e6, {});

std::vector<HistogramPointData> actuals;
reader->Collect([&](ResourceMetrics &rm) {
for (const ScopeMetrics &smd : rm.scope_metric_data_)
{
for (const MetricData &md : smd.metric_data_)
{
for (const PointDataAttributes &dp : md.point_data_attr_)
{
actuals.push_back(opentelemetry::nostd::get<HistogramPointData>(dp.point_data));
}
}
}
return true;
});

ASSERT_EQ(1, actuals.size());
const auto &actual = actuals.at(0);
ASSERT_EQ(1000275.0, opentelemetry::nostd::get<double>(actual.sum_));
ASSERT_EQ(11, actual.count_);
}
#endif
Loading

0 comments on commit 998badd

Please sign in to comment.