Skip to content

Commit f254406

Browse files
committed
[core][telemetry/11] support histogram metric on worker side
Signed-off-by: can <can@anyscale.com> Signed-off-by: Cuong Nguyen <can@anyscale.com>
1 parent 070310e commit f254406

File tree

5 files changed

+75
-81
lines changed

5 files changed

+75
-81
lines changed

src/ray/stats/metric.cc

Lines changed: 21 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -119,29 +119,27 @@ void Metric::Record(double value, TagsType tags) {
119119
// construction time to avoid static initialization order issues. This function
120120
// is thread-safe.
121121
RegisterOpenTelemetryMetric();
122-
if (OpenTelemetryMetricRecorder::GetInstance().IsMetricRegistered(name_)) {
123-
// Collect tags from both the metric-specific tags and the global tags.
124-
absl::flat_hash_map<std::string, std::string> open_telemetry_tags;
125-
std::unordered_set<std::string> tag_keys_set;
126-
for (const auto &tag_key : tag_keys_) {
127-
tag_keys_set.insert(tag_key.name());
128-
}
129-
// Insert metric-specific tags that match the expected keys.
130-
for (const auto &tag : tags) {
131-
const std::string &key = tag.first.name();
132-
if (tag_keys_set.count(key)) {
133-
open_telemetry_tags[key] = tag.second;
134-
}
135-
}
136-
// Add global tags, overwriting any existing tag keys.
137-
for (const auto &tag : StatsConfig::instance().GetGlobalTags()) {
138-
open_telemetry_tags[tag.first.name()] = tag.second;
122+
// Collect tags from both the metric-specific tags and the global tags.
123+
absl::flat_hash_map<std::string, std::string> open_telemetry_tags;
124+
std::unordered_set<std::string> tag_keys_set;
125+
for (const auto &tag_key : tag_keys_) {
126+
tag_keys_set.insert(tag_key.name());
127+
}
128+
// Insert metric-specific tags that match the expected keys.
129+
for (const auto &tag : tags) {
130+
const std::string &key = tag.first.name();
131+
if (tag_keys_set.count(key)) {
132+
open_telemetry_tags[key] = tag.second;
139133
}
140-
OpenTelemetryMetricRecorder::GetInstance().SetMetricValue(
141-
name_, std::move(open_telemetry_tags), value);
142-
143-
return;
144134
}
135+
// Add global tags, overwriting any existing tag keys.
136+
for (const auto &tag : StatsConfig::instance().GetGlobalTags()) {
137+
open_telemetry_tags[tag.first.name()] = tag.second;
138+
}
139+
OpenTelemetryMetricRecorder::GetInstance().SetMetricValue(
140+
name_, std::move(open_telemetry_tags), value);
141+
142+
return;
145143
}
146144

147145
absl::MutexLock lock(&registration_mutex_);
@@ -196,11 +194,6 @@ void Gauge::RegisterOpenTelemetryMetric() {
196194
}
197195

198196
void Gauge::RegisterView() {
199-
if (::RayConfig::instance().experimental_enable_open_telemetry_on_core()) {
200-
// Register the metric in OpenTelemetry.
201-
OpenTelemetryMetricRecorder::GetInstance().RegisterGaugeMetric(name_, description_);
202-
return;
203-
}
204197
opencensus::stats::ViewDescriptor view_descriptor =
205198
opencensus::stats::ViewDescriptor()
206199
.set_name(name_)
@@ -211,14 +204,11 @@ void Gauge::RegisterView() {
211204
}
212205

213206
void Histogram::RegisterOpenTelemetryMetric() {
214-
// Histogram is not supported in OpenTelemetry.
215-
return;
207+
OpenTelemetryMetricRecorder::GetInstance().RegisterHistogramMetric(
208+
name_, description_, boundaries_);
216209
}
217210

218211
void Histogram::RegisterView() {
219-
if (measure_ == nullptr) {
220-
return;
221-
}
222212
opencensus::stats::ViewDescriptor view_descriptor =
223213
opencensus::stats::ViewDescriptor()
224214
.set_name(name_)
@@ -235,11 +225,6 @@ void Count::RegisterOpenTelemetryMetric() {
235225
}
236226

237227
void Count::RegisterView() {
238-
if (::RayConfig::instance().experimental_enable_open_telemetry_on_core()) {
239-
// Register the metric in OpenTelemetry.
240-
OpenTelemetryMetricRecorder::GetInstance().RegisterCounterMetric(name_, description_);
241-
return;
242-
}
243228
opencensus::stats::ViewDescriptor view_descriptor =
244229
opencensus::stats::ViewDescriptor()
245230
.set_name(name_)
@@ -255,11 +240,6 @@ void Sum::RegisterOpenTelemetryMetric() {
255240
}
256241

257242
void Sum::RegisterView() {
258-
if (::RayConfig::instance().experimental_enable_open_telemetry_on_core()) {
259-
// Register the metric in OpenTelemetry.
260-
OpenTelemetryMetricRecorder::GetInstance().RegisterSumMetric(name_, description_);
261-
return;
262-
}
263243
opencensus::stats::ViewDescriptor view_descriptor =
264244
opencensus::stats::ViewDescriptor()
265245
.set_name(name_)

src/ray/stats/tests/metric_with_open_telemetry_test.cc

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -105,44 +105,5 @@ TEST_F(MetricTest, TestSumMetric) {
105105
"legacy_metric_sum_test"));
106106
}
107107

108-
TEST_F(MetricTest, TestSumMetric) {
109-
ASSERT_TRUE(
110-
OpenTelemetryMetricRecorder::GetInstance().IsMetricRegistered("metric_sum_test"));
111-
// We only test that recording is not crashing. The actual value is not checked
112-
// because open telemetry does not provide a way to retrieve the value of a counter.
113-
// Checking value is performed via e2e tests instead (e.g., in test_metrics_agent.py).
114-
STATS_metric_sum_test.Record(200.0, {{"Tag1", "Value1"}, {"Tag2", "Value2"}});
115-
}
116-
117-
TEST_F(MetricTest, TestSumMetric) {
118-
ASSERT_TRUE(
119-
OpenTelemetryMetricRecorder::GetInstance().IsMetricRegistered("metric_sum_test"));
120-
// We only test that recording is not crashing. The actual value is not checked
121-
// because open telemetry does not provide a way to retrieve the value of a counter.
122-
// Checking value is performed via e2e tests instead (e.g., in test_metrics_agent.py).
123-
STATS_metric_sum_test.Record(200.0, {{"Tag1", "Value1"}, {"Tag2", "Value2"}});
124-
LegacyMetricSumTest.Record(200.0, {{"Tag1"sv, "Value1"}, {"Tag2"sv, "Value2"}});
125-
ASSERT_TRUE(OpenTelemetryMetricRecorder::GetInstance().IsMetricRegistered(
126-
"legacy_metric_sum_test"));
127-
}
128-
129-
TEST_F(MetricTest, TestCounterMetric) {
130-
ASSERT_TRUE(OpenTelemetryMetricRecorder::GetInstance().IsMetricRegistered(
131-
"metric_counter_test"));
132-
STATS_metric_counter_test.Record(100.0, {{"Tag1", "Value1"}, {"Tag2", "Value2"}});
133-
LegacyMetricCounterTest.Record(100.0, {{"Tag1"sv, "Value1"}, {"Tag2"sv, "Value2"}});
134-
ASSERT_TRUE(OpenTelemetryMetricRecorder::GetInstance().IsMetricRegistered(
135-
"legacy_metric_counter_test"));
136-
}
137-
138-
TEST_F(MetricTest, TestSumMetric) {
139-
ASSERT_TRUE(
140-
OpenTelemetryMetricRecorder::GetInstance().IsMetricRegistered("metric_sum_test"));
141-
STATS_metric_sum_test.Record(200.0, {{"Tag1", "Value1"}, {"Tag2", "Value2"}});
142-
LegacyMetricSumTest.Record(200.0, {{"Tag1"sv, "Value1"}, {"Tag2"sv, "Value2"}});
143-
ASSERT_TRUE(OpenTelemetryMetricRecorder::GetInstance().IsMetricRegistered(
144-
"legacy_metric_sum_test"));
145-
}
146-
147108
} // namespace telemetry
148109
} // namespace ray

src/ray/telemetry/open_telemetry_metric_recorder.cc

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,17 @@
1313
// limitations under the License.
1414
#include "ray/telemetry/open_telemetry_metric_recorder.h"
1515

16+
#include <opentelemetry/context/context.h>
1617
#include <opentelemetry/exporters/otlp/otlp_grpc_metric_exporter.h>
1718
#include <opentelemetry/metrics/provider.h>
1819
#include <opentelemetry/nostd/variant.h>
20+
#include <opentelemetry/sdk/metrics/aggregation/histogram_aggregation.h>
1921
#include <opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h>
2022
#include <opentelemetry/sdk/metrics/instruments.h>
23+
#include <opentelemetry/sdk/metrics/view/instrument_selector.h>
24+
#include <opentelemetry/sdk/metrics/view/meter_selector.h>
25+
#include <opentelemetry/sdk/metrics/view/view.h>
26+
#include <opentelemetry/sdk/metrics/view/view_registry.h>
2127

2228
#include <cassert>
2329
#include <utility>
@@ -165,6 +171,36 @@ void OpenTelemetryMetricRecorder::RegisterSumMetric(const std::string &name,
165171
registered_instruments_[name] = std::move(instrument);
166172
}
167173

174+
void OpenTelemetryMetricRecorder::RegisterHistogramMetric(
175+
const std::string &name,
176+
const std::string &description,
177+
const std::vector<double> &buckets) {
178+
std::lock_guard<std::mutex> lock(mutex_);
179+
if (registered_instruments_.contains(name)) {
180+
return; // Already registered
181+
}
182+
// Create a histogram instrument with explicit buckets
183+
auto aggregation_config =
184+
std::make_shared<opentelemetry::sdk::metrics::HistogramAggregationConfig>();
185+
aggregation_config->boundaries_ = buckets;
186+
auto view = std::make_unique<opentelemetry::sdk::metrics::View>(
187+
name,
188+
description,
189+
"",
190+
opentelemetry::sdk::metrics::AggregationType::kHistogram,
191+
aggregation_config);
192+
193+
auto instrument_selector =
194+
std::make_unique<opentelemetry::sdk::metrics::InstrumentSelector>(
195+
opentelemetry::sdk::metrics::InstrumentType::kHistogram, name, "");
196+
auto meter_selector =
197+
std::make_unique<opentelemetry::sdk::metrics::MeterSelector>(meter_name_, "", "");
198+
meter_provider_->AddView(
199+
std::move(instrument_selector), std::move(meter_selector), std::move(view));
200+
auto instrument = GetMeter()->CreateDoubleHistogram(name, description, "");
201+
registered_instruments_[name] = std::move(instrument);
202+
}
203+
168204
void OpenTelemetryMetricRecorder::SetMetricValue(
169205
const std::string &name,
170206
absl::flat_hash_map<std::string, std::string> &&tags,
@@ -222,6 +258,9 @@ void OpenTelemetryMetricRecorder::SetSynchronousMetricValue(
222258
} else if (auto *sum = dynamic_cast<opentelemetry::metrics::UpDownCounter<double> *>(
223259
sync_instr_ptr->get())) {
224260
sum->Add(value, std::move(tags));
261+
} else if (auto *histogram = dynamic_cast<opentelemetry::metrics::Histogram<double> *>(
262+
sync_instr_ptr->get())) {
263+
histogram->Record(value, std::move(tags), opentelemetry::context::Context());
225264
} else {
226265
// Unknown or unsupported instrument type
227266
RAY_CHECK(false) << "Unsupported synchronous instrument type for metric: " << name;

src/ray/telemetry/open_telemetry_metric_recorder.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ class OpenTelemetryMetricRecorder {
6161
// Registers a sum metric with the given name and description
6262
void RegisterSumMetric(const std::string &name, const std::string &description);
6363

64+
// Registers a histogram metric with the given name, description, and buckets
65+
void RegisterHistogramMetric(const std::string &name,
66+
const std::string &description,
67+
const std::vector<double> &buckets);
68+
6469
// Check if a metric with the given name is registered.
6570
bool IsMetricRegistered(const std::string &name);
6671

@@ -110,6 +115,8 @@ class OpenTelemetryMetricRecorder {
110115
// Flag to indicate if the recorder is shutting down. This is used to make sure that
111116
// the recorder will only shutdown once.
112117
std::atomic<bool> is_shutdown_{false};
118+
// The name of the meter used for this recorder.
119+
const std::string meter_name_ = "ray";
113120

114121
void SetObservableMetricValue(const std::string &name,
115122
absl::flat_hash_map<std::string, std::string> &&tags,
@@ -123,7 +130,7 @@ class OpenTelemetryMetricRecorder {
123130
const std::string &name, const absl::flat_hash_map<std::string, std::string> &tags);
124131

125132
opentelemetry::nostd::shared_ptr<opentelemetry::metrics::Meter> GetMeter() {
126-
return meter_provider_->GetMeter("ray");
133+
return meter_provider_->GetMeter(meter_name_);
127134
}
128135

129136
// Declare the test class as a friend to allow access to private members for testing.

src/ray/telemetry/tests/open_telemetry_metric_recorder_test.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,5 +64,12 @@ TEST_F(OpenTelemetryMetricRecorderTest, TestSumMetric) {
6464
ASSERT_TRUE(recorder_.IsMetricRegistered("test_sum"));
6565
}
6666

67+
TEST_F(OpenTelemetryMetricRecorderTest, TestHistogramMetric) {
68+
recorder_.RegisterHistogramMetric(
69+
"test_histogram", "Test histogram description", {0.0, 10.0, 20.0, 30.0});
70+
// Check that the histogram metric is registered
71+
ASSERT_TRUE(recorder_.IsMetricRegistered("test_histogram"));
72+
}
73+
6774
} // namespace telemetry
6875
} // namespace ray

0 commit comments

Comments
 (0)