Skip to content

Commit fca491d

Browse files
committed
[core] fix ray_event_recorder initialization
Signed-off-by: Cuong Nguyen <can@anyscale.com>
1 parent fe7ad00 commit fca491d

File tree

5 files changed

+14
-5
lines changed

5 files changed

+14
-5
lines changed

src/ray/gcs/gcs_server.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) {
296296
// Init metrics and event exporter.
297297
metrics_agent_client_->WaitForServerReady([this](const Status &server_status) {
298298
stats::InitOpenTelemetryExporter(config_.metrics_agent_port, server_status);
299-
ray_event_recorder_->StartExportingEvents();
299+
ray_event_recorder_->StartExportingEvents(server_status);
300300
});
301301

302302
// Start RPC server when all tables have finished loading initial

src/ray/observability/fake_ray_event_recorder.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ namespace observability {
2424

2525
class FakeRayEventRecorder : public RayEventRecorderInterface {
2626
public:
27-
void StartExportingEvents() override {}
27+
void StartExportingEvents(const Status &aggregator_agent_status) override {}
2828
void AddEvents(std::vector<std::unique_ptr<RayEventInterface>> &&data_list) override {
2929
absl::MutexLock lock(&mutex_);
3030
buffer_.insert(buffer_.end(),

src/ray/observability/ray_event_recorder.cc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,22 @@ RayEventRecorder::RayEventRecorder(
3434
buffer_(max_buffer_size),
3535
dropped_events_counter_(dropped_events_counter) {}
3636

37-
void RayEventRecorder::StartExportingEvents() {
37+
void RayEventRecorder::StartExportingEvents(const Status &aggregator_agent_status) {
3838
absl::MutexLock lock(&mutex_);
39+
if (!aggregator_agent_status.ok()) {
40+
RAY_LOG(ERROR) << "Failed to establish connection to the event aggregator agent. "
41+
<< "Error: " << aggregator_agent_status.ToString();
42+
return;
43+
}
3944
RAY_CHECK(!exporting_started_)
4045
<< "RayEventRecorder::StartExportingEvents() should be called only once.";
4146
exporting_started_ = true;
4247
periodical_runner_->RunFnPeriodically(
4348
[this]() { ExportEvents(); },
4449
RayConfig::instance().ray_events_report_interval_ms(),
4550
"RayEventRecorder.ExportEvents");
51+
RAY_LOG(INFO) << "Successfully established connection to the event aggregator agent. "
52+
"Events will start being exported.";
4653
}
4754

4855
void RayEventRecorder::ExportEvents() {

src/ray/observability/ray_event_recorder_interface.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <memory>
1818
#include <vector>
1919

20+
#include "ray/common/status.h"
2021
#include "ray/observability/ray_event_interface.h"
2122

2223
namespace ray {
@@ -29,7 +30,7 @@ class RayEventRecorderInterface {
2930
// Start exporting events to the event aggregator by periodically sending events to
3031
// the event aggregator. This should be called only once. Subsequent calls will be
3132
// ignored.
32-
virtual void StartExportingEvents() = 0;
33+
virtual void StartExportingEvents(const Status &aggregator_agent_status) = 0;
3334

3435
// Add a vector of data to the internal buffer. Data in the buffer will be sent to
3536
// the event aggregator periodically.

src/ray/rpc/metrics_agent_client.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ void MetricsAgentClientImpl::WaitForServerReadyWithRetry(
5050
}
5151
});
5252
if (retry_count >= max_retry) {
53-
init_exporter_fn(Status::RpcError("The metrics agent server is not ready.", 14));
53+
init_exporter_fn(
54+
Status::RpcError("Running out of retries to initialize the metrics agent.", 14));
5455
return;
5556
}
5657
retry_count++;

0 commit comments

Comments
 (0)