1515#include " ray/raylet/node_manager.h"
1616
1717#include < algorithm>
18+ #include < boost/bind/bind.hpp>
1819#include < cctype>
1920#include < cerrno>
2021#include < csignal>
@@ -121,6 +122,33 @@ void CleanupProcessGroupSend(pid_t saved_pgid,
121122}
122123#endif
123124
125+ std::vector<std::string> GenerateEnumNames (const char *const *enum_names_ptr,
126+ int start_index,
127+ int end_index) {
128+ std::vector<std::string> enum_names;
129+ enum_names.reserve (start_index);
130+ for (int i = 0 ; i < start_index; ++i) {
131+ enum_names.emplace_back (" EmptyMessageType" );
132+ }
133+ size_t i = 0 ;
134+ while (true ) {
135+ const char *name = enum_names_ptr[i];
136+ if (name == nullptr ) {
137+ break ;
138+ }
139+ enum_names.emplace_back (name);
140+ i++;
141+ }
142+ RAY_CHECK (static_cast <size_t >(end_index) == enum_names.size () - 1 )
143+ << " Message Type mismatch!" ;
144+ return enum_names;
145+ }
146+
147+ const std::vector<std::string> node_manager_message_enum =
148+ GenerateEnumNames (ray::protocol::EnumNamesMessageType(),
149+ static_cast <int >(ray::protocol::MessageType::MIN),
150+ static_cast <int >(ray::protocol::MessageType::MAX));
151+
124152} // namespace
125153
126154NodeManager::NodeManager (
@@ -148,7 +176,9 @@ NodeManager::NodeManager(
148176 std::function<void (const rpc::NodeDeathInfo &)> shutdown_raylet_gracefully,
149177 AddProcessToCgroupHook add_process_to_system_cgroup_hook,
150178 std::unique_ptr<CgroupManagerInterface> cgroup_manager,
151- std::atomic_bool &shutting_down)
179+ std::atomic_bool &shutting_down,
180+ boost::asio::basic_socket_acceptor<local_stream_protocol> acceptor,
181+ local_stream_socket socket)
152182 : self_node_id_(self_node_id),
153183 self_node_name_ (std::move(self_node_name)),
154184 io_service_(io_service),
@@ -202,7 +232,9 @@ NodeManager::NodeManager(
202232 CreateMemoryUsageRefreshCallback())),
203233 add_process_to_system_cgroup_hook_(std::move(add_process_to_system_cgroup_hook)),
204234 cgroup_manager_(std::move(cgroup_manager)),
205- shutting_down_(shutting_down) {
235+ shutting_down_(shutting_down),
236+ acceptor_(std::move(acceptor)),
237+ socket_(std::move(socket)) {
206238 RAY_LOG (INFO).WithField (kLogKeyNodeID , self_node_id_) << " Initializing NodeManager" ;
207239
208240 placement_group_resource_manager_ =
@@ -251,6 +283,29 @@ NodeManager::NodeManager(
251283 " NodeManager.GCTaskFailureReason" );
252284}
253285
286+ void NodeManager::Start (rpc::GcsNodeInfo &&self_node_info) {
287+ auto register_callback =
288+ [this ,
289+ object_manager_port = self_node_info.object_manager_port ()](const Status &status) {
290+ RAY_CHECK_OK (status);
291+ RAY_LOG (INFO) << " Raylet of id, " << self_node_id_
292+ << " started. Raylet consists of node_manager and object_manager."
293+ << " node_manager address: "
294+ << BuildAddress (initial_config_.node_manager_address ,
295+ initial_config_.node_manager_port )
296+ << " object_manager address: "
297+ << BuildAddress (initial_config_.node_manager_address ,
298+ object_manager_port)
299+ << " hostname: " << boost::asio::ip::host_name ();
300+ this ->RegisterGcs ();
301+ };
302+ gcs_client_.Nodes ().RegisterSelf (std::move (self_node_info), register_callback);
303+
304+ acceptor_.async_accept (
305+ socket_,
306+ boost::bind (&NodeManager::HandleAccept, this , boost::asio::placeholders::error));
307+ }
308+
254309void NodeManager::RegisterGcs () {
255310 auto on_node_change = [this ](const NodeID &node_id,
256311 const rpc::GcsNodeAddressAndLiveness &data) {
@@ -410,6 +465,45 @@ void NodeManager::RegisterGcs() {
410465 " NodeManager.GcsCheckAlive" );
411466}
412467
468+ void NodeManager::HandleAccept (const boost::system::error_code &error) {
469+ if (!error) {
470+ ConnectionErrorHandler error_handler =
471+ [this ](const std::shared_ptr<ClientConnection> &client,
472+ const boost::system::error_code &err) {
473+ this ->HandleClientConnectionError (client, err);
474+ };
475+
476+ MessageHandler message_handler = [this ](
477+ const std::shared_ptr<ClientConnection> &client,
478+ int64_t message_type,
479+ const std::vector<uint8_t > &message) {
480+ this ->ProcessClientMessage (client, message_type, message.data ());
481+ };
482+
483+ // Accept a new local client and dispatch it to the node manager.
484+ auto conn = ClientConnection::Create (message_handler,
485+ error_handler,
486+ std::move (socket_),
487+ " worker" ,
488+ node_manager_message_enum);
489+
490+ // Begin processing messages. The message handler above is expected to call this to
491+ // continue processing messages.
492+ conn->ProcessMessages ();
493+ } else {
494+ RAY_LOG (ERROR) << " Raylet failed to accept new connection: " << error.message ();
495+ if (error == boost::asio::error::operation_aborted) {
496+ // The server is being destroyed. Don't continue accepting connections.
497+ return ;
498+ }
499+ };
500+
501+ // We're ready to accept another client.
502+ acceptor_.async_accept (
503+ socket_,
504+ boost::bind (&NodeManager::HandleAccept, this , boost::asio::placeholders::error));
505+ }
506+
413507void NodeManager::DestroyWorker (std::shared_ptr<WorkerInterface> worker,
414508 rpc::WorkerExitType disconnect_type,
415509 const std::string &disconnect_detail,
@@ -1298,7 +1392,6 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr<WorkerInterface> &
12981392}
12991393
13001394namespace {
1301-
13021395void SendDisconnectClientReply (const WorkerID &worker_id,
13031396 const std::shared_ptr<ClientConnection> &client) {
13041397 flatbuffers::FlatBufferBuilder fbb;
@@ -2821,8 +2914,8 @@ void NodeManager::Stop() {
28212914#if !defined(_WIN32)
28222915 // Best-effort process-group cleanup for any remaining workers before shutdown.
28232916 if (RayConfig::instance ().process_group_cleanup_enabled ()) {
2824- auto workers = worker_pool_.GetAllRegisteredWorkers (/* filter_dead_worker */ true ,
2825- /* filter_io_workers */ false );
2917+ auto workers = worker_pool_.GetAllRegisteredWorkers (/* filter_dead_workers= */ true ,
2918+ /* filter_io_workers= */ false );
28262919 for (const auto &w : workers) {
28272920 auto saved = w->GetSavedProcessGroupId ();
28282921 if (saved.has_value ()) {
@@ -2840,6 +2933,7 @@ void NodeManager::Stop() {
28402933 object_manager_.Stop ();
28412934 dashboard_agent_manager_.reset ();
28422935 runtime_env_agent_manager_.reset ();
2936+ acceptor_.close ();
28432937}
28442938
28452939void NodeManager::RecordMetrics () {
0 commit comments