Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/ray/core_worker/core_worker_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,13 @@ CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options)
io_service_,
*write_locked.Get()->client_call_manager_);
metrics_agent_client_->WaitForServerReady([this](const Status &server_status) {
stats::InitOpenTelemetryExporter(options_.metrics_agent_port, server_status);
if (server_status.ok()) {
stats::InitOpenTelemetryExporter(options_.metrics_agent_port);
} else {
RAY_LOG(ERROR) << "Failed to establish connection to the metrics exporter agent. "
"Metrics will not be exported. "
<< "Exporter agent status: " << server_status.ToString();
}
});
}
}
Expand Down
9 changes: 5 additions & 4 deletions src/ray/gcs/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,14 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) {

// Init metrics and event exporter.
metrics_agent_client_->WaitForServerReady([this](const Status &server_status) {
stats::InitOpenTelemetryExporter(config_.metrics_agent_port, server_status);
if (server_status.ok()) {
stats::InitOpenTelemetryExporter(config_.metrics_agent_port);
ray_event_recorder_->StartExportingEvents();
} else {
RAY_LOG(ERROR) << "Failed to establish connection to the event exporter. Events "
"will not be exported. "
<< "Event exporter status: " << server_status.ToString();
RAY_LOG(ERROR)
<< "Failed to establish connection to the event+metrics exporter agent. "
"Events and metrics will not be exported. "
<< "Exporter agent status: " << server_status.ToString();
}
});

Expand Down
14 changes: 10 additions & 4 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -959,10 +959,16 @@ int main(int argc, char *argv[]) {
ray::stats::Init(global_tags, metrics_agent_port, ray::WorkerID::Nil());
metrics_agent_client = std::make_unique<ray::rpc::MetricsAgentClientImpl>(
"127.0.0.1", metrics_agent_port, main_service, *client_call_manager);
metrics_agent_client->WaitForServerReady(
[metrics_agent_port](const ray::Status &server_status) {
ray::stats::InitOpenTelemetryExporter(metrics_agent_port, server_status);
});
metrics_agent_client->WaitForServerReady([metrics_agent_port](
const ray::Status &server_status) {
if (server_status.ok()) {
ray::stats::InitOpenTelemetryExporter(metrics_agent_port);
} else {
RAY_LOG(ERROR) << "Failed to establish connection to the metrics exporter agent. "
"Metrics will not be exported. "
<< "Exporter agent status: " << server_status.ToString();
}
});

// Initialize event framework. This should be done after the node manager is
// initialized.
Expand Down
9 changes: 1 addition & 8 deletions src/ray/stats/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,10 @@ static inline void Init(
StatsConfig::instance().SetIsInitialized(true);
}

static inline void InitOpenTelemetryExporter(const int metrics_agent_port,
const Status &metrics_agent_server_status) {
static inline void InitOpenTelemetryExporter(const int metrics_agent_port) {
if (!RayConfig::instance().enable_open_telemetry()) {
return;
}
if (!metrics_agent_server_status.ok()) {
RAY_LOG(ERROR) << "Failed to initialize OpenTelemetry exporter. Data will not be "
"exported to the "
<< "metrics agent. Server status: " << metrics_agent_server_status;
return;
}
OpenTelemetryMetricRecorder::GetInstance().RegisterGrpcExporter(
/*endpoint=*/std::string("127.0.0.1:") + std::to_string(metrics_agent_port),
/*interval=*/
Expand Down