diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 71993125f913..0d91c06439c2 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -96,6 +96,7 @@ void GcsNodeManager::HandleRegisterNode(rpc::RegisterNodeRequest request, AddNode(std::make_shared(request.node_info())); WriteNodeExportEvent(request.node_info()); GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); + WriteNodeExportEvent(request.node_info()); }; if (request.node_info().is_head_node()) { // mark all old head nodes as dead if exists: @@ -428,6 +429,7 @@ void GcsNodeManager::OnNodeFailure(const NodeID &node_id, node_table_updated_callback(Status::OK()); } RAY_CHECK_OK(gcs_publisher_->PublishNodeInfo(node_id, *node_info_delta, nullptr)); + WriteNodeExportEvent(*node); }; RAY_CHECK_OK(gcs_table_storage_->NodeTable().Put(node_id, *node, on_done)); } else if (node_table_updated_callback != nullptr) { diff --git a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc index a8a0157e0d54..e13a347e8007 100644 --- a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc @@ -12,7 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include +#include // clang-format off #include "gtest/gtest.h" @@ -21,9 +23,11 @@ #include "ray/rpc/node_manager/node_manager_client.h" #include "ray/rpc/node_manager/node_manager_client_pool.h" #include "mock/ray/pubsub/publisher.h" +#include "ray/util/event.h" // clang-format on namespace ray { + class GcsNodeManagerTest : public ::testing::Test { public: GcsNodeManagerTest() { @@ -41,6 +45,54 @@ class GcsNodeManagerTest : public ::testing::Test { std::shared_ptr gcs_publisher_; }; +std::string GenerateLogDir() { + std::string log_dir_generate = std::string(5, ' '); + FillRandom(&log_dir_generate); + std::string log_dir = "event" + StringToHex(log_dir_generate); + return log_dir; +} + +class GcsNodeManagerExportAPITest : public ::testing::Test { + public: + GcsNodeManagerExportAPITest() { + raylet_client_ = std::make_shared(); + client_pool_ = std::make_shared( + [this](const rpc::Address &) { return raylet_client_; }); + gcs_publisher_ = std::make_shared( + std::make_unique()); + gcs_table_storage_ = std::make_shared(io_service_); + + RayConfig::instance().initialize( + R"( +{ + "enable_export_api_write": true +} + )"); + log_dir_ = GenerateLogDir(); + const std::vector source_types = { + rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_NODE}; + RayEventInit_(source_types, + absl::flat_hash_map(), + log_dir_, + "warning", + false); + } + + virtual ~GcsNodeManagerExportAPITest() { + io_service_.stop(); + EventManager::Instance().ClearReporters(); + std::filesystem::remove_all(log_dir_.c_str()); + } + + protected: + std::shared_ptr gcs_table_storage_; + std::shared_ptr raylet_client_; + std::shared_ptr client_pool_; + std::shared_ptr gcs_publisher_; + instrumented_io_context io_service_; + std::string log_dir_; +}; + TEST_F(GcsNodeManagerTest, TestManagement) { gcs::GcsNodeManager node_manager( gcs_publisher_, gcs_table_storage_, client_pool_, ClusterID::Nil()); @@ -96,6 +148,59 @@ TEST_F(GcsNodeManagerTest, TestListener) { } } +TEST_F(GcsNodeManagerExportAPITest, TestExportEventRegisterNode) { + // Test export event is written when a node is added with HandleRegisterNode + gcs::GcsNodeManager node_manager( + gcs_publisher_, gcs_table_storage_, client_pool_, ClusterID::Nil()); + auto node = Mocker::GenNodeInfo(); + + rpc::RegisterNodeRequest register_request; + register_request.mutable_node_info()->CopyFrom(*node); + rpc::RegisterNodeReply register_reply; + auto send_reply_callback = + [](ray::Status status, std::function f1, std::function f2) {}; + + node_manager.HandleRegisterNode(register_request, ®ister_reply, send_reply_callback); + io_service_.poll(); + + std::vector vc; + Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_NODE.log"); + ASSERT_EQ((int)vc.size(), 1); + json event_data = json::parse(vc[0])["event_data"].get(); + ASSERT_EQ(event_data["state"], "ALIVE"); +} + +TEST_F(GcsNodeManagerExportAPITest, TestExportEventUnregisterNode) { + // Test export event is written when a node is removed with HandleUnregisterNode + gcs::GcsNodeManager node_manager( + gcs_publisher_, gcs_table_storage_, client_pool_, ClusterID::Nil()); + auto node = Mocker::GenNodeInfo(); + auto node_id = NodeID::FromBinary(node->node_id()); + node_manager.AddNode(node); + + rpc::UnregisterNodeRequest unregister_request; + unregister_request.set_node_id(node_id.Binary()); + unregister_request.mutable_node_death_info()->set_reason( + rpc::NodeDeathInfo::UNEXPECTED_TERMINATION); + unregister_request.mutable_node_death_info()->set_reason_message("mock reason message"); + rpc::UnregisterNodeReply unregister_reply; + auto send_reply_callback = + [](ray::Status status, std::function f1, std::function f2) {}; + + node_manager.HandleUnregisterNode( + unregister_request, &unregister_reply, send_reply_callback); + io_service_.poll(); + + std::vector vc; + Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_NODE.log"); + ASSERT_EQ((int)vc.size(), 1); + json event_data = json::parse(vc[0])["event_data"].get(); + ASSERT_EQ(event_data["state"], "DEAD"); + // Verify death cause for last node DEAD event + ASSERT_EQ(event_data["death_info"]["reason"], "UNEXPECTED_TERMINATION"); + ASSERT_EQ(event_data["death_info"]["reason_message"], "mock reason message"); +} + } // namespace ray int main(int argc, char **argv) {