Skip to content

Commit 2a9967b

Browse files
edoakesYoussefEssDS
authored andcommitted
[core] Ensure client_call_manager_ outlives metrics_agent_client_ in core worker (ray-project#58315)
The `metrics_agent_client_` depends on `client_call_manager_`, but previously it was pulling out a reference to it from the core worker, which is not guaranteed to outlive the agent client. Modifying it to keep the `client_call_manager_` as a field of the `core_worker_process` instead. I think we may also need to drain any ongoing RPCs from the `metrics_agent_client_` on shutdown. Leaving that for a future PR. --------- Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
1 parent fc17875 commit 2a9967b

File tree

5 files changed

+17
-23
lines changed

5 files changed

+17
-23
lines changed

src/ray/core_worker/core_worker.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,6 @@ CoreWorker::CoreWorker(
289289
CoreWorkerOptions options,
290290
std::unique_ptr<WorkerContext> worker_context,
291291
instrumented_io_context &io_service,
292-
std::unique_ptr<rpc::ClientCallManager> client_call_manager,
293292
std::shared_ptr<rpc::CoreWorkerClientPool> core_worker_client_pool,
294293
std::shared_ptr<rpc::RayletClientPool> raylet_client_pool,
295294
std::shared_ptr<PeriodicalRunnerInterface> periodical_runner,
@@ -325,7 +324,6 @@ CoreWorker::CoreWorker(
325324
: nullptr),
326325
worker_context_(std::move(worker_context)),
327326
io_service_(io_service),
328-
client_call_manager_(std::move(client_call_manager)),
329327
core_worker_client_pool_(std::move(core_worker_client_pool)),
330328
raylet_client_pool_(std::move(raylet_client_pool)),
331329
periodical_runner_(std::move(periodical_runner)),

src/ray/core_worker/core_worker.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,6 @@ class CoreWorker {
176176
CoreWorker(CoreWorkerOptions options,
177177
std::unique_ptr<WorkerContext> worker_context,
178178
instrumented_io_context &io_service,
179-
std::unique_ptr<rpc::ClientCallManager> client_call_manager,
180179
std::shared_ptr<rpc::CoreWorkerClientPool> core_worker_client_pool,
181180
std::shared_ptr<rpc::RayletClientPool> raylet_client_pool,
182181
std::shared_ptr<PeriodicalRunnerInterface> periodical_runner,
@@ -1737,9 +1736,6 @@ class CoreWorker {
17371736
/// Event loop where the IO events are handled. e.g. async GCS operations.
17381737
instrumented_io_context &io_service_;
17391738

1740-
/// Shared client call manager.
1741-
std::unique_ptr<rpc::ClientCallManager> client_call_manager_;
1742-
17431739
/// Shared core worker client pool.
17441740
std::shared_ptr<rpc::CoreWorkerClientPool> core_worker_client_pool_;
17451741

src/ray/core_worker/core_worker_process.cc

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,6 @@ std::shared_ptr<CoreWorker> CoreWorkerProcess::TryGetWorker() {
132132
std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
133133
CoreWorkerOptions options, const WorkerID &worker_id) {
134134
/// Event loop where the IO events are handled. e.g. async GCS operations.
135-
auto client_call_manager = std::make_unique<rpc::ClientCallManager>(
136-
io_service_, /*record_stats=*/false, options.node_ip_address);
137135
auto periodical_runner = PeriodicalRunner::Create(io_service_);
138136
auto worker_context = std::make_unique<WorkerContext>(
139137
options.worker_type, worker_id, GetProcessJobID(options));
@@ -166,7 +164,7 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
166164
auto task_event_buffer = std::make_unique<worker::TaskEventBufferImpl>(
167165
std::make_unique<gcs::GcsClient>(options.gcs_options, options.node_ip_address),
168166
std::make_unique<rpc::EventAggregatorClientImpl>(options.metrics_agent_port,
169-
*client_call_manager),
167+
*client_call_manager_),
170168
options.session_name);
171169

172170
// Start the IO thread first to make sure the checker is working.
@@ -237,7 +235,7 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
237235
local_node_id, options.node_ip_address, options.node_manager_port);
238236
auto local_raylet_rpc_client =
239237
std::make_shared<rpc::RayletClient>(std::move(raylet_address),
240-
*client_call_manager,
238+
*client_call_manager_,
241239
/*raylet_unavailable_timeout_callback=*/[] {});
242240
auto core_worker_server =
243241
std::make_unique<rpc::GrpcServer>(WorkerTypeString(options.worker_type),
@@ -277,7 +275,7 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
277275
auto core_worker = GetCoreWorker();
278276
return std::make_shared<ray::rpc::RayletClient>(
279277
addr,
280-
*core_worker->client_call_manager_,
278+
*client_call_manager_,
281279
rpc::RayletClientPool::GetDefaultUnavailableTimeoutCallback(
282280
core_worker->gcs_client_.get(),
283281
core_worker->raylet_client_pool_.get(),
@@ -289,7 +287,7 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
289287
auto core_worker = GetCoreWorker();
290288
return std::make_shared<rpc::CoreWorkerClient>(
291289
addr,
292-
*core_worker->client_call_manager_,
290+
*client_call_manager_,
293291
rpc::CoreWorkerClientPool::GetDefaultUnavailableTimeoutCallback(
294292
core_worker->gcs_client_.get(),
295293
core_worker->core_worker_client_pool_.get(),
@@ -658,7 +656,6 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
658656
std::make_shared<CoreWorker>(std::move(options),
659657
std::move(worker_context),
660658
io_service_,
661-
std::move(client_call_manager),
662659
std::move(core_worker_client_pool),
663660
std::move(raylet_client_pool),
664661
std::move(periodical_runner),
@@ -696,6 +693,8 @@ CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options)
696693
? ComputeDriverIdFromJob(options_.job_id)
697694
: WorkerID::FromRandom()),
698695
io_work_(io_service_.get_executor()),
696+
client_call_manager_(std::make_unique<rpc::ClientCallManager>(
697+
io_service_, /*record_stats=*/false, options.node_ip_address)),
699698
task_execution_service_work_(task_execution_service_.get_executor()),
700699
service_handler_(std::make_unique<CoreWorkerServiceHandlerProxy>()) {
701700
if (options_.enable_logging) {
@@ -824,10 +823,7 @@ CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options)
824823
write_locked.Get() = worker;
825824
// Initialize metrics agent client.
826825
metrics_agent_client_ = std::make_unique<ray::rpc::MetricsAgentClientImpl>(
827-
"127.0.0.1",
828-
options_.metrics_agent_port,
829-
io_service_,
830-
*write_locked.Get()->client_call_manager_);
826+
"127.0.0.1", options_.metrics_agent_port, io_service_, *client_call_manager_);
831827
metrics_agent_client_->WaitForServerReady([this](const Status &server_status) {
832828
if (server_status.ok()) {
833829
stats::InitOpenTelemetryExporter(options_.metrics_agent_port);

src/ray/core_worker/core_worker_process.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,9 +153,6 @@ class CoreWorkerProcessImpl {
153153
/// The various options.
154154
const CoreWorkerOptions options_;
155155

156-
/// The core worker instance of this worker process.
157-
MutexProtected<std::shared_ptr<CoreWorker>> core_worker_;
158-
159156
/// The worker ID of this worker.
160157
const WorkerID worker_id_;
161158

@@ -166,6 +163,10 @@ class CoreWorkerProcessImpl {
166163
/// Keeps the io_service_ alive.
167164
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> io_work_;
168165

166+
/// Shared client call manager across all gRPC clients in the core worker process.
167+
/// This is used by the CoreWorker and the MetricsAgentClient.
168+
std::unique_ptr<rpc::ClientCallManager> client_call_manager_;
169+
169170
/// Event loop where tasks are processed.
170171
/// task_execution_service_ should be destructed first to avoid
171172
/// issues like https://github.com/ray-project/ray/issues/18857
@@ -179,6 +180,9 @@ class CoreWorkerProcessImpl {
179180
// Thread that runs a boost::asio service to process IO events.
180181
boost::thread io_thread_;
181182

183+
/// The core worker instance of this worker process.
184+
MutexProtected<std::shared_ptr<CoreWorker>> core_worker_;
185+
182186
/// The proxy service handler that routes the RPC calls to the core worker.
183187
std::unique_ptr<CoreWorkerServiceHandlerProxy> service_handler_;
184188

src/ray/core_worker/tests/core_worker_test.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ class CoreWorkerTest : public ::testing::Test {
9696
return Status::OK();
9797
};
9898

99-
auto client_call_manager = std::make_unique<rpc::ClientCallManager>(
99+
client_call_manager_ = std::make_unique<rpc::ClientCallManager>(
100100
io_service_, /*record_stats=*/false, /*local_address=*/"");
101101

102102
auto core_worker_client_pool =
@@ -167,7 +167,7 @@ class CoreWorkerTest : public ::testing::Test {
167167

168168
auto task_event_buffer = std::make_unique<worker::TaskEventBufferImpl>(
169169
std::make_unique<gcs::MockGcsClient>(),
170-
std::make_unique<rpc::EventAggregatorClientImpl>(0, *client_call_manager),
170+
std::make_unique<rpc::EventAggregatorClientImpl>(0, *client_call_manager_),
171171
"test_session");
172172

173173
task_manager_ = std::make_shared<TaskManager>(
@@ -248,7 +248,6 @@ class CoreWorkerTest : public ::testing::Test {
248248
core_worker_ = std::make_shared<CoreWorker>(std::move(options),
249249
std::move(worker_context),
250250
io_service_,
251-
std::move(client_call_manager),
252251
std::move(core_worker_client_pool),
253252
std::move(raylet_client_pool),
254253
std::move(periodical_runner),
@@ -289,6 +288,7 @@ class CoreWorkerTest : public ::testing::Test {
289288
boost::thread io_thread_;
290289

291290
rpc::Address rpc_address_;
291+
std::unique_ptr<rpc::ClientCallManager> client_call_manager_;
292292
std::shared_ptr<ReferenceCounterInterface> reference_counter_;
293293
std::shared_ptr<CoreWorkerMemoryStore> memory_store_;
294294
ActorTaskSubmitter *actor_task_submitter_;

0 commit comments

Comments
 (0)