@@ -132,8 +132,6 @@ std::shared_ptr<CoreWorker> CoreWorkerProcess::TryGetWorker() {
132132std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker (
133133 CoreWorkerOptions options, const WorkerID &worker_id) {
134134 // / Event loop where the IO events are handled. e.g. async GCS operations.
135- auto client_call_manager = std::make_unique<rpc::ClientCallManager>(
136- io_service_, /* record_stats=*/ false , options.node_ip_address );
137135 auto periodical_runner = PeriodicalRunner::Create (io_service_);
138136 auto worker_context = std::make_unique<WorkerContext>(
139137 options.worker_type , worker_id, GetProcessJobID (options));
@@ -166,7 +164,7 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
166164 auto task_event_buffer = std::make_unique<worker::TaskEventBufferImpl>(
167165 std::make_unique<gcs::GcsClient>(options.gcs_options , options.node_ip_address ),
168166 std::make_unique<rpc::EventAggregatorClientImpl>(options.metrics_agent_port ,
169- *client_call_manager ),
167+ *client_call_manager_ ),
170168 options.session_name );
171169
172170 // Start the IO thread first to make sure the checker is working.
@@ -237,7 +235,7 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
237235 local_node_id, options.node_ip_address , options.node_manager_port );
238236 auto local_raylet_rpc_client =
239237 std::make_shared<rpc::RayletClient>(std::move (raylet_address),
240- *client_call_manager ,
238+ *client_call_manager_ ,
241239 /* raylet_unavailable_timeout_callback=*/ [] {});
242240 auto core_worker_server =
243241 std::make_unique<rpc::GrpcServer>(WorkerTypeString (options.worker_type ),
@@ -277,7 +275,7 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
277275 auto core_worker = GetCoreWorker ();
278276 return std::make_shared<ray::rpc::RayletClient>(
279277 addr,
280- *core_worker-> client_call_manager_ ,
278+ *client_call_manager_,
281279 rpc::RayletClientPool::GetDefaultUnavailableTimeoutCallback (
282280 core_worker->gcs_client_ .get (),
283281 core_worker->raylet_client_pool_ .get (),
@@ -289,7 +287,7 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
289287 auto core_worker = GetCoreWorker ();
290288 return std::make_shared<rpc::CoreWorkerClient>(
291289 addr,
292- *core_worker-> client_call_manager_ ,
290+ *client_call_manager_,
293291 rpc::CoreWorkerClientPool::GetDefaultUnavailableTimeoutCallback (
294292 core_worker->gcs_client_ .get (),
295293 core_worker->core_worker_client_pool_ .get (),
@@ -658,7 +656,6 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
658656 std::make_shared<CoreWorker>(std::move (options),
659657 std::move (worker_context),
660658 io_service_,
661- std::move (client_call_manager),
662659 std::move (core_worker_client_pool),
663660 std::move (raylet_client_pool),
664661 std::move (periodical_runner),
@@ -696,6 +693,8 @@ CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options)
696693 ? ComputeDriverIdFromJob(options_.job_id)
697694 : WorkerID::FromRandom()),
698695 io_work_(io_service_.get_executor()),
696+ client_call_manager_(std::make_unique<rpc::ClientCallManager>(
697+ io_service_, /* record_stats=*/ false , options.node_ip_address)),
699698 task_execution_service_work_(task_execution_service_.get_executor()),
700699 service_handler_(std::make_unique<CoreWorkerServiceHandlerProxy>()) {
701700 if (options_.enable_logging ) {
@@ -824,10 +823,7 @@ CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options)
824823 write_locked.Get () = worker;
825824 // Initialize metrics agent client.
826825 metrics_agent_client_ = std::make_unique<ray::rpc::MetricsAgentClientImpl>(
827- " 127.0.0.1" ,
828- options_.metrics_agent_port ,
829- io_service_,
830- *write_locked.Get ()->client_call_manager_ );
826+ " 127.0.0.1" , options_.metrics_agent_port , io_service_, *client_call_manager_);
831827 metrics_agent_client_->WaitForServerReady ([this ](const Status &server_status) {
832828 if (server_status.ok ()) {
833829 stats::InitOpenTelemetryExporter (options_.metrics_agent_port );
0 commit comments