Skip to content

Commit

Permalink
Add MeterContext::ForEachMeter() method to process callbacks on Meter…
Browse files Browse the repository at this point in the history
… in thread-safe manner (open-telemetry#1783)
  • Loading branch information
lalitb authored Nov 17, 2022
1 parent d0571d8 commit 8f778b7
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 9 deletions.
16 changes: 12 additions & 4 deletions sdk/include/opentelemetry/sdk/metrics/meter_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,15 @@ class MeterContext : public std::enable_shared_from_this<MeterContext>
ViewRegistry *GetViewRegistry() const noexcept;

/**
* Obtain the configured meters.
* NOTE - INTERNAL method, can change in future.
* Process callback for each meter in thread-safe manner
*/
bool ForEachMeter(nostd::function_ref<bool(std::shared_ptr<Meter> &meter)> callback) noexcept;

/**
* NOTE - INTERNAL method, can change in future.
* Get the configured meters.
* This method is NOT thread safe, and only called through MeterProvider
*
*/
nostd::span<std::shared_ptr<Meter>> GetMeters() noexcept;
Expand Down Expand Up @@ -96,8 +104,8 @@ class MeterContext : public std::enable_shared_from_this<MeterContext>
std::unique_ptr<View> view) noexcept;

/**
* Adds a meter to the list of configured meters.
* Note: This method is INTERNAL to sdk not thread safe.
* NOTE - INTERNAL method, can change in future.
* Adds a meter to the list of configured meters in thread safe manner.
*
* @param meter
*/
Expand All @@ -124,7 +132,7 @@ class MeterContext : public std::enable_shared_from_this<MeterContext>

std::atomic_flag shutdown_latch_ = ATOMIC_FLAG_INIT;
opentelemetry::common::SpinLockMutex forceflush_lock_;
opentelemetry::common::SpinLockMutex storage_lock_;
opentelemetry::common::SpinLockMutex meter_lock_;
};

} // namespace metrics
Expand Down
18 changes: 16 additions & 2 deletions sdk/src/metrics/meter_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,23 @@ ViewRegistry *MeterContext::GetViewRegistry() const noexcept
return views_.get();
}

bool MeterContext::ForEachMeter(
nostd::function_ref<bool(std::shared_ptr<Meter> &meter)> callback) noexcept
{
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(meter_lock_);
for (auto &meter : meters_)
{
if (!callback(meter))
{
return false;
}
}
return true;
}

nostd::span<std::shared_ptr<Meter>> MeterContext::GetMeters() noexcept
{
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(storage_lock_);
// no lock required, as this is called by MeterProvider in thread-safe manner.
return nostd::span<std::shared_ptr<Meter>>{meters_};
}

Expand Down Expand Up @@ -59,7 +73,7 @@ void MeterContext::AddView(std::unique_ptr<InstrumentSelector> instrument_select

void MeterContext::AddMeter(std::shared_ptr<Meter> meter)
{
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(storage_lock_);
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(meter_lock_);
meters_.push_back(meter);
}

Expand Down
6 changes: 3 additions & 3 deletions sdk/src/metrics/state/metric_collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ bool MetricCollector::Collect(
return false;
}
ResourceMetrics resource_metrics;
for (auto &meter : meter_context_->GetMeters())
{
meter_context_->ForEachMeter([&](std::shared_ptr<Meter> meter) noexcept {
auto collection_ts = std::chrono::system_clock::now();
ScopeMetrics scope_metrics;
scope_metrics.metric_data_ = meter->Collect(this, collection_ts);
scope_metrics.scope_ = meter->GetInstrumentationScope();
resource_metrics.scope_metric_data_.push_back(scope_metrics);
}
return true;
});
resource_metrics.resource_ = &meter_context_->GetResource();
callback(resource_metrics);
return true;
Expand Down

0 comments on commit 8f778b7

Please sign in to comment.