Skip to content

Commit

Permalink
Fix open-telemetry#1663 Threading issue between Meter::RegisterSyncMe…
Browse files Browse the repository at this point in the history
…tricStorage and Meter::Collect (open-telemetry#1666)
  • Loading branch information
lalitb authored Oct 11, 2022
1 parent 7140166 commit 99f2ba4
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 2 deletions.
1 change: 1 addition & 0 deletions sdk/include/opentelemetry/sdk/metrics/meter.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class Meter final : public opentelemetry::metrics::Meter
InstrumentDescriptor &instrument_descriptor);
std::unique_ptr<AsyncWritableMetricStorage> RegisterAsyncMetricStorage(
InstrumentDescriptor &instrument_descriptor);
opentelemetry::common::SpinLockMutex storage_lock_;
};
} // namespace metrics
} // namespace sdk
Expand Down
3 changes: 3 additions & 0 deletions sdk/src/metrics/meter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ const sdk::instrumentationscope::InstrumentationScope *Meter::GetInstrumentation
std::unique_ptr<SyncWritableMetricStorage> Meter::RegisterSyncMetricStorage(
InstrumentDescriptor &instrument_descriptor)
{
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(storage_lock_);
auto ctx = meter_context_.lock();
if (!ctx)
{
Expand Down Expand Up @@ -251,6 +252,7 @@ std::unique_ptr<SyncWritableMetricStorage> Meter::RegisterSyncMetricStorage(
std::unique_ptr<AsyncWritableMetricStorage> Meter::RegisterAsyncMetricStorage(
InstrumentDescriptor &instrument_descriptor)
{
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(storage_lock_);
auto ctx = meter_context_.lock();
if (!ctx)
{
Expand Down Expand Up @@ -302,6 +304,7 @@ std::vector<MetricData> Meter::Collect(CollectorHandle *collector,
<< "The metric context is invalid");
return std::vector<MetricData>{};
}
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(storage_lock_);
for (auto &metric_storage : storage_registry_)
{
metric_storage.second->Collect(collector, ctx->GetCollectors(), ctx->GetSDKStartTime(),
Expand Down
64 changes: 62 additions & 2 deletions sdk/test/metrics/meter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ class MockMetricReader : public MetricReader

namespace
{
nostd::shared_ptr<metrics::Meter> InitMeter(MetricReader **metricReaderPtr)
nostd::shared_ptr<metrics::Meter> InitMeter(MetricReader **metricReaderPtr,
std::string meter_name = "meter_name")
{
static std::shared_ptr<metrics::MeterProvider> provider(new MeterProvider());
std::unique_ptr<MetricReader> metric_reader(new MockMetricReader());
*metricReaderPtr = metric_reader.get();
auto p = std::static_pointer_cast<MeterProvider>(provider);
p->AddMetricReader(std::move(metric_reader));
auto meter = provider->GetMeter("meter_name");
auto meter = provider->GetMeter(meter_name);
return meter;
}
} // namespace
Expand Down Expand Up @@ -70,6 +71,65 @@ TEST(MeterTest, BasicAsyncTests)
}
return true;
});
observable_counter->RemoveCallback(asyc_generate_measurements, nullptr);
}

constexpr static unsigned MAX_THREADS = 25;
constexpr static unsigned MAX_ITERATIONS_MT = 1000;

TEST(MeterTest, StressMultiThread)
{
MetricReader *metric_reader_ptr = nullptr;
auto meter = InitMeter(&metric_reader_ptr, "stress_test_meter");
std::atomic<unsigned> threadCount(0);
size_t numIterations = MAX_ITERATIONS_MT;
std::atomic<bool> do_collect{false}, do_sync_create{true}, do_async_create{false};
std::vector<nostd::shared_ptr<opentelemetry::metrics::ObservableInstrument>>
observable_instruments;
std::vector<std::thread> meter_operation_threads;
size_t instrument_id = 0;
while (numIterations--)
{
for (size_t i = 0; i < MAX_THREADS; i++)
{
if (threadCount++ < MAX_THREADS)
{
auto t = std::thread([&]() {
std::this_thread::yield();
if (do_sync_create.exchange(false))
{
std::string instrument_name = "test_couter_" + std::to_string(instrument_id);
meter->CreateLongCounter(instrument_name, "", "");
do_async_create.store(true);
instrument_id++;
}
if (do_async_create.exchange(false))
{
std::cout << "\n creating async thread " << std::to_string(numIterations);
auto observable_instrument =
meter->CreateLongObservableGauge("test_gauge_" + std::to_string(instrument_id));
observable_instrument->AddCallback(asyc_generate_measurements, nullptr);
observable_instruments.push_back(std::move(observable_instrument));
do_collect.store(true);
instrument_id++;
}
if (do_collect.exchange(false))
{
metric_reader_ptr->Collect([](ResourceMetrics &metric_data) { return true; });
do_sync_create.store(true);
}
});
meter_operation_threads.push_back(std::move(t));
}
}
}
for (auto &t : meter_operation_threads)
{
if (t.joinable())
{
t.join();
}
}
}

#endif

0 comments on commit 99f2ba4

Please sign in to comment.