Skip to content

Commit 2be8a2b

Browse files
committed
[core][event] fix "RayEventRecorder::StartExportingEvents() should be called only once."
Signed-off-by: Cuong Nguyen <can@anyscale.com>
1 parent fca491d commit 2be8a2b

File tree

6 files changed

+39
-41
lines changed

6 files changed

+39
-41
lines changed

src/ray/gcs/gcs_server.cc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,13 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) {
296296
// Init metrics and event exporter.
297297
metrics_agent_client_->WaitForServerReady([this](const Status &server_status) {
298298
stats::InitOpenTelemetryExporter(config_.metrics_agent_port, server_status);
299-
ray_event_recorder_->StartExportingEvents(server_status);
299+
if (server_status.ok()) {
300+
ray_event_recorder_->StartExportingEvents();
301+
} else {
302+
RAY_LOG(ERROR) << "Failed to establish connection to the event exporter. Events "
303+
"will not be exported. "
304+
<< "Event exporter status: " << server_status.ToString();
305+
}
300306
});
301307

302308
// Start RPC server when all tables have finished loading initial

src/ray/observability/fake_ray_event_recorder.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ namespace observability {
2424

2525
class FakeRayEventRecorder : public RayEventRecorderInterface {
2626
public:
27-
void StartExportingEvents(const Status &aggregator_agent_status) override {}
27+
void StartExportingEvents() override {}
2828
void AddEvents(std::vector<std::unique_ptr<RayEventInterface>> &&data_list) override {
2929
absl::MutexLock lock(&mutex_);
3030
buffer_.insert(buffer_.end(),

src/ray/observability/ray_event_recorder.cc

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,22 +34,15 @@ RayEventRecorder::RayEventRecorder(
3434
buffer_(max_buffer_size),
3535
dropped_events_counter_(dropped_events_counter) {}
3636

37-
void RayEventRecorder::StartExportingEvents(const Status &aggregator_agent_status) {
37+
void RayEventRecorder::StartExportingEvents() {
3838
absl::MutexLock lock(&mutex_);
39-
if (!aggregator_agent_status.ok()) {
40-
RAY_LOG(ERROR) << "Failed to establish connection to the event aggregator agent. "
41-
<< "Error: " << aggregator_agent_status.ToString();
42-
return;
43-
}
4439
RAY_CHECK(!exporting_started_)
4540
<< "RayEventRecorder::StartExportingEvents() should be called only once.";
4641
exporting_started_ = true;
4742
periodical_runner_->RunFnPeriodically(
4843
[this]() { ExportEvents(); },
4944
RayConfig::instance().ray_events_report_interval_ms(),
5045
"RayEventRecorder.ExportEvents");
51-
RAY_LOG(INFO) << "Successfully established connection to the event aggregator agent. "
52-
"Events will start being exported.";
5346
}
5447

5548
void RayEventRecorder::ExportEvents() {

src/ray/observability/ray_event_recorder_interface.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
#include <memory>
1818
#include <vector>
1919

20-
#include "ray/common/status.h"
2120
#include "ray/observability/ray_event_interface.h"
2221

2322
namespace ray {
@@ -30,7 +29,7 @@ class RayEventRecorderInterface {
3029
// Start exporting events to the event aggregator by periodically sending events to
3130
// the event aggregator. This should be called only once. Subsequent calls will be
3231
// ignored.
33-
virtual void StartExportingEvents(const Status &aggregator_agent_status) = 0;
32+
virtual void StartExportingEvents() = 0;
3433

3534
// Add a vector of data to the internal buffer. Data in the buffer will be sent to
3635
// the event aggregator periodically.

src/ray/rpc/metrics_agent_client.cc

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -41,31 +41,31 @@ void MetricsAgentClientImpl::WaitForServerReadyWithRetry(
4141
// Only log the first time we start the retry loop.
4242
RAY_LOG(INFO) << "Initializing exporter ...";
4343
}
44-
HealthCheck(rpc::HealthCheckRequest(),
45-
[this, init_exporter_fn](auto &status, auto &&reply) {
46-
if (status.ok() && !exporter_initialized_) {
47-
init_exporter_fn(status);
48-
exporter_initialized_ = true;
49-
RAY_LOG(INFO) << "Exporter initialized.";
50-
}
51-
});
52-
if (retry_count >= max_retry) {
53-
init_exporter_fn(
54-
Status::RpcError("Running out of retries to initialize the metrics agent.", 14));
55-
return;
56-
}
57-
retry_count++;
58-
retry_timer_->expires_after(std::chrono::milliseconds(retry_interval_ms));
59-
retry_timer_->async_wait(
60-
[this, init_exporter_fn, retry_count, max_retry, retry_interval_ms](
61-
const boost::system::error_code &error) {
62-
if (!error) {
63-
WaitForServerReadyWithRetry(
64-
init_exporter_fn, retry_count, max_retry, retry_interval_ms);
65-
} else {
66-
RAY_LOG(ERROR) << "Failed to initialize exporter. Data will not be exported to "
67-
"the metrics agent.";
44+
HealthCheck(
45+
rpc::HealthCheckRequest(),
46+
[this, init_exporter_fn, retry_count, max_retry, retry_interval_ms](auto &status,
47+
auto &&reply) {
48+
if (status.ok()) {
49+
if (exporter_initialized_) {
50+
return;
51+
}
52+
init_exporter_fn(status);
53+
exporter_initialized_ = true;
54+
RAY_LOG(INFO) << "Exporter initialized.";
55+
return;
56+
}
57+
if (retry_count >= max_retry) {
58+
init_exporter_fn(Status::RpcError(
59+
"Running out of retries to initialize the metrics agent.", 14));
60+
return;
6861
}
62+
io_service_.post(
63+
[this, init_exporter_fn, retry_count, max_retry, retry_interval_ms]() {
64+
WaitForServerReadyWithRetry(
65+
init_exporter_fn, retry_count + 1, max_retry, retry_interval_ms);
66+
},
67+
"MetricsAgentClient.WaitForServerReadyWithRetry",
68+
retry_interval_ms * 1000);
6969
});
7070
}
7171

src/ray/rpc/metrics_agent_client.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,12 @@ class MetricsAgentClientImpl : public MetricsAgentClient {
7272
MetricsAgentClientImpl(const std::string &address,
7373
const int port,
7474
instrumented_io_context &io_service,
75-
rpc::ClientCallManager &client_call_manager) {
75+
rpc::ClientCallManager &client_call_manager)
76+
: io_service_(io_service) {
7677
RAY_LOG(DEBUG) << "Initiate the metrics client of address:"
7778
<< BuildAddress(address, port);
7879
grpc_client_ =
7980
std::make_unique<GrpcClient<ReporterService>>(address, port, client_call_manager);
80-
retry_timer_ = std::make_unique<boost::asio::steady_timer>(io_service);
8181
};
8282

8383
VOID_RPC_CLIENT_METHOD(ReporterService,
@@ -89,7 +89,7 @@ class MetricsAgentClientImpl : public MetricsAgentClient {
8989
VOID_RPC_CLIENT_METHOD(ReporterService,
9090
HealthCheck,
9191
grpc_client_,
92-
/*method_timeout_ms*/ -1,
92+
/*method_timeout_ms*/ kMetricAgentInitRetryDelayMs,
9393
override)
9494

9595
/// Wait for the server to be ready. Invokes the callback with the final readiness
@@ -99,8 +99,8 @@ class MetricsAgentClientImpl : public MetricsAgentClient {
9999
private:
100100
/// The RPC client.
101101
std::unique_ptr<GrpcClient<ReporterService>> grpc_client_;
102-
/// Timer for retrying to initialize the OpenTelemetry exporter.
103-
std::unique_ptr<boost::asio::steady_timer> retry_timer_;
102+
/// The io context to run the retry loop.
103+
instrumented_io_context &io_service_;
104104
/// Whether the exporter is initialized.
105105
bool exporter_initialized_ = false;
106106
/// Wait for the server to be ready with a retry count. Invokes the callback

0 commit comments

Comments
 (0)