Skip to content

Commit

Permalink
Fix #1632 - Occasional Segfault with LongCounter instrument (#1638)
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb authored Sep 29, 2022
1 parent 30cc1bc commit 9e87a6e
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 23 deletions.
16 changes: 12 additions & 4 deletions sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
opentelemetry::common::SystemTimestamp /* observation_time */) noexcept
{
// process the read measurements - aggregate and store in hashmap
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(hashmap_lock_);
for (auto &measurement : measurements)
{
auto aggr = DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_);
Expand Down Expand Up @@ -96,10 +97,16 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
nostd::function_ref<bool(MetricData)> metric_collection_callback) noexcept override
{

auto status = temporal_metric_storage_.buildMetrics(collector, collectors, sdk_start_ts,
collection_ts, std::move(delta_hash_map_),
metric_collection_callback);
delta_hash_map_.reset(new AttributesHashMap());
std::shared_ptr<AttributesHashMap> delta_metrics = nullptr;
{
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(hashmap_lock_);
delta_metrics = std::move(delta_hash_map_);
delta_hash_map_.reset(new AttributesHashMap);
}

auto status =
temporal_metric_storage_.buildMetrics(collector, collectors, sdk_start_ts, collection_ts,
delta_metrics, metric_collection_callback);
return status;
}

Expand All @@ -110,6 +117,7 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
void *state_;
std::unique_ptr<AttributesHashMap> cumulative_hash_map_;
std::unique_ptr<AttributesHashMap> delta_hash_map_;
opentelemetry::common::SpinLockMutex hashmap_lock_;
TemporalMetricStorage temporal_metric_storage_;
};

Expand Down
16 changes: 1 addition & 15 deletions sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

#pragma once
#ifndef ENABLE_METRICS_PREVIEW
# include "opentelemetry/common/spin_lock_mutex.h"
# include "opentelemetry/nostd/function_ref.h"
# include "opentelemetry/sdk/common/attribute_utils.h"
# include "opentelemetry/sdk/common/attributemap_hash.h"
Expand All @@ -13,7 +12,6 @@

# include <functional>
# include <memory>
# include <mutex>
# include <unordered_map>

OPENTELEMETRY_BEGIN_NAMESPACE
Expand All @@ -37,7 +35,6 @@ class AttributesHashMap
public:
Aggregation *Get(const MetricAttributes &attributes) const
{
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(lock_);
auto it = hash_map_.find(attributes);
if (it != hash_map_.end())
{
Expand All @@ -52,7 +49,6 @@ class AttributesHashMap
*/
bool Has(const MetricAttributes &attributes) const
{
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(lock_);
return (hash_map_.find(attributes) == hash_map_.end()) ? false : true;
}

Expand All @@ -64,8 +60,6 @@ class AttributesHashMap
Aggregation *GetOrSetDefault(const MetricAttributes &attributes,
std::function<std::unique_ptr<Aggregation>()> aggregation_callback)
{
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(lock_);

auto it = hash_map_.find(attributes);
if (it != hash_map_.end())
{
Expand All @@ -81,7 +75,6 @@ class AttributesHashMap
*/
void Set(const MetricAttributes &attributes, std::unique_ptr<Aggregation> value)
{
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(lock_);
hash_map_[attributes] = std::move(value);
}

Expand All @@ -91,7 +84,6 @@ class AttributesHashMap
bool GetAllEnteries(
nostd::function_ref<bool(const MetricAttributes &, Aggregation &)> callback) const
{
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(lock_);
for (auto &kv : hash_map_)
{
if (!callback(kv.first, *(kv.second.get())))
Expand All @@ -105,17 +97,11 @@ class AttributesHashMap
/**
* Return the size of hash.
*/
size_t Size()
{
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(lock_);
return hash_map_.size();
}
size_t Size() { return hash_map_.size(); }

private:
std::unordered_map<MetricAttributes, std::unique_ptr<Aggregation>, AttributeHashGenerator>
hash_map_;

mutable opentelemetry::common::SpinLockMutex lock_;
};
} // namespace metrics

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
return;
}
exemplar_reservoir_->OfferMeasurement(value, {}, context, std::chrono::system_clock::now());
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(attribute_hashmap_lock_);
attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_)->Aggregate(value);
}

Expand All @@ -67,6 +68,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
exemplar_reservoir_->OfferMeasurement(value, attributes, context,
std::chrono::system_clock::now());
auto attr = attributes_processor_->process(attributes);
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(attribute_hashmap_lock_);
attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_)->Aggregate(value);
}

Expand All @@ -77,6 +79,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
return;
}
exemplar_reservoir_->OfferMeasurement(value, {}, context, std::chrono::system_clock::now());
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(attribute_hashmap_lock_);
attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_)->Aggregate(value);
}

Expand All @@ -93,6 +96,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
exemplar_reservoir_->OfferMeasurement(value, attributes, context,
std::chrono::system_clock::now());
auto attr = attributes_processor_->process(attributes);
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(attribute_hashmap_lock_);
attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_)->Aggregate(value);
}

Expand All @@ -117,6 +121,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
std::function<std::unique_ptr<Aggregation>()> create_default_aggregation_;
nostd::shared_ptr<ExemplarReservoir> exemplar_reservoir_;
TemporalMetricStorage temporal_metric_storage_;
opentelemetry::common::SpinLockMutex attribute_hashmap_lock_;
};

} // namespace metrics
Expand Down
8 changes: 6 additions & 2 deletions sdk/src/metrics/state/sync_metric_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ bool SyncMetricStorage::Collect(CollectorHandle *collector,
// Add the current delta metrics to `unreported metrics stash` for all the collectors,
// this will also empty the delta metrics hashmap, and make it available for
// recordings
std::shared_ptr<AttributesHashMap> delta_metrics = std::move(attributes_hashmap_);
attributes_hashmap_.reset(new AttributesHashMap);
std::shared_ptr<AttributesHashMap> delta_metrics = nullptr;
{
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(attribute_hashmap_lock_);
delta_metrics = std::move(attributes_hashmap_);
attributes_hashmap_.reset(new AttributesHashMap);
}

return temporal_metric_storage_.buildMetrics(collector, collectors, sdk_start_ts, collection_ts,
std::move(delta_metrics), callback);
Expand Down
7 changes: 5 additions & 2 deletions sdk/test/metrics/attributes_hashmap_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@ void BM_AttributseHashMap(benchmark::State &state)
std::vector<MetricAttributes> attributes = {{{"k1", "v1"}, {"k2", "v2"}},
{{"k1", "v1"}, {"k2", "v2"}, {"k3", "v3"}}};

auto work = [&attributes, &hash_map](const size_t i) {
std::mutex m;

auto work = [&attributes, &hash_map, &m](const size_t i) {
std::function<std::unique_ptr<Aggregation>()> create_default_aggregation =
[]() -> std::unique_ptr<Aggregation> {
return std::unique_ptr<Aggregation>(new DropAggregation);
};

m.lock();
hash_map.GetOrSetDefault(attributes[i % 2], create_default_aggregation)->Aggregate(1l);
benchmark::DoNotOptimize(hash_map.Has(attributes[i % 2]));
m.unlock();
};
while (state.KeepRunning())
{
Expand Down

0 comments on commit 9e87a6e

Please sign in to comment.