From d4a52ea33323dcc521deccdb566fec88550ff5b1 Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Tue, 3 Sep 2024 08:52:27 -0700 Subject: [PATCH] [observability][export-api] Write node events (#47422) Same code changes as [observability][export-api] Write node events #47221 Move test into a separate file to create a separate bazel target that can be skipped on Windows --- BUILD.bazel | 19 +++ src/ray/gcs/gcs_server/gcs_node_manager.cc | 35 ++++- src/ray/gcs/gcs_server/gcs_node_manager.h | 45 ++++++ src/ray/gcs/gcs_server/gcs_server_main.cc | 6 +- .../gcs_node_manager_export_event_test.cc | 137 ++++++++++++++++++ src/ray/gcs/test/gcs_test_util.h | 12 ++ src/ray/util/event.cc | 10 +- 7 files changed, 261 insertions(+), 3 deletions(-) create mode 100644 src/ray/gcs/gcs_server/test/gcs_node_manager_export_event_test.cc diff --git a/BUILD.bazel b/BUILD.bazel index 92d44ddccbd4..10b14b4d0624 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1637,6 +1637,25 @@ ray_cc_test( ], ) +ray_cc_test( + name = "gcs_node_manager_export_event_test", + size = "small", + srcs = [ + "src/ray/gcs/gcs_server/test/gcs_node_manager_export_event_test.cc", + ], + tags = [ + "no_windows", + "team:core" + ], + deps = [ + ":gcs_server_lib", + ":gcs_server_test_util", + ":gcs_test_util_lib", + ":ray_mock", + "@com_google_googletest//:gtest_main", + ], +) + ray_cc_test( name = "gcs_job_manager_test", size = "small", diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index fe55fb3944d2..249b520091cd 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -39,6 +39,36 @@ GcsNodeManager::GcsNodeManager( raylet_client_pool_(std::move(raylet_client_pool)), cluster_id_(cluster_id) {} +void GcsNodeManager::WriteNodeExportEvent(rpc::GcsNodeInfo node_info) const { + /// Write node_info as a export node event if + /// enable_export_api_write() is enabled. + if (!RayConfig::instance().enable_export_api_write()) { + return; + } + std::shared_ptr export_node_data_ptr = + std::make_shared(); + export_node_data_ptr->set_node_id(node_info.node_id()); + export_node_data_ptr->set_node_manager_address(node_info.node_manager_address()); + export_node_data_ptr->mutable_resources_total()->insert( + node_info.resources_total().begin(), node_info.resources_total().end()); + export_node_data_ptr->set_node_name(node_info.node_name()); + export_node_data_ptr->set_start_time_ms(node_info.start_time_ms()); + export_node_data_ptr->set_end_time_ms(node_info.end_time_ms()); + export_node_data_ptr->set_is_head_node(node_info.is_head_node()); + export_node_data_ptr->mutable_labels()->insert(node_info.labels().begin(), + node_info.labels().end()); + export_node_data_ptr->set_state(ConvertGCSNodeStateToExport(node_info.state())); + if (!node_info.death_info().reason_message().empty() || + node_info.death_info().reason() != + rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_UNSPECIFIED) { + export_node_data_ptr->mutable_death_info()->set_reason_message( + node_info.death_info().reason_message()); + export_node_data_ptr->mutable_death_info()->set_reason( + ConvertNodeDeathReasonToExport(node_info.death_info().reason())); + } + RayExportEvent(export_node_data_ptr).SendEvent(); +} + // Note: ServerCall will populate the cluster_id. void GcsNodeManager::HandleGetClusterId(rpc::GetClusterIdRequest request, rpc::GetClusterIdReply *reply, @@ -63,6 +93,7 @@ void GcsNodeManager::HandleRegisterNode(rpc::RegisterNodeRequest request, << ", node name = " << request.node_info().node_name(); RAY_CHECK_OK(gcs_publisher_->PublishNodeInfo(node_id, request.node_info(), nullptr)); AddNode(std::make_shared(request.node_info())); + WriteNodeExportEvent(request.node_info()); GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; if (request.node_info().is_head_node()) { @@ -132,6 +163,7 @@ void GcsNodeManager::HandleUnregisterNode(rpc::UnregisterNodeRequest request, auto on_put_done = [=](const Status &status) { RAY_CHECK_OK(gcs_publisher_->PublishNodeInfo(node_id, *node_info_delta, nullptr)); + WriteNodeExportEvent(*node); }; RAY_CHECK_OK(gcs_table_storage_->NodeTable().Put(node_id, *node, on_put_done)); GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); @@ -403,8 +435,9 @@ void GcsNodeManager::OnNodeFailure(const NodeID &node_id, node_info_delta->set_end_time_ms(node->end_time_ms()); node_info_delta->mutable_death_info()->CopyFrom(node->death_info()); - auto on_done = [this, node_id, node_table_updated_callback, node_info_delta]( + auto on_done = [this, node_id, node_table_updated_callback, node_info_delta, node]( const Status &status) { + WriteNodeExportEvent(*node); if (node_table_updated_callback != nullptr) { node_table_updated_callback(Status::OK()); } diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index edefa416cda9..7db037e767e5 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -31,6 +31,7 @@ #include "ray/rpc/gcs_server/gcs_rpc_server.h" #include "ray/rpc/node_manager/node_manager_client.h" #include "ray/rpc/node_manager/node_manager_client_pool.h" +#include "ray/util/event.h" #include "src/ray/protobuf/gcs.pb.h" namespace ray { @@ -184,6 +185,50 @@ class GcsNodeManager : public rpc::NodeInfoHandler { /// \return The inferred death info of the node. rpc::NodeDeathInfo InferDeathInfo(const NodeID &node_id); + void WriteNodeExportEvent(rpc::GcsNodeInfo node_info) const; + + rpc::ExportNodeData::GcsNodeState ConvertGCSNodeStateToExport( + rpc::GcsNodeInfo::GcsNodeState node_state) const { + switch (node_state) { + case rpc::GcsNodeInfo_GcsNodeState::GcsNodeInfo_GcsNodeState_ALIVE: + return rpc::ExportNodeData_GcsNodeState::ExportNodeData_GcsNodeState_ALIVE; + case rpc::GcsNodeInfo_GcsNodeState::GcsNodeInfo_GcsNodeState_DEAD: + return rpc::ExportNodeData_GcsNodeState::ExportNodeData_GcsNodeState_DEAD; + default: + // Unknown rpc::GcsNodeInfo::GcsNodeState value + RAY_LOG(FATAL) << "Invalid value for rpc::GcsNodeInfo::GcsNodeState " + << rpc::GcsNodeInfo::GcsNodeState_Name(node_state); + return rpc::ExportNodeData_GcsNodeState::ExportNodeData_GcsNodeState_DEAD; + } + } + + rpc::ExportNodeData::NodeDeathInfo::Reason ConvertNodeDeathReasonToExport( + rpc::NodeDeathInfo::Reason reason) const { + switch (reason) { + case rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_UNSPECIFIED: + return rpc::ExportNodeData_NodeDeathInfo_Reason:: + ExportNodeData_NodeDeathInfo_Reason_UNSPECIFIED; + case rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_EXPECTED_TERMINATION: + return rpc::ExportNodeData_NodeDeathInfo_Reason:: + ExportNodeData_NodeDeathInfo_Reason_EXPECTED_TERMINATION; + case rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_UNEXPECTED_TERMINATION: + return rpc::ExportNodeData_NodeDeathInfo_Reason:: + ExportNodeData_NodeDeathInfo_Reason_UNEXPECTED_TERMINATION; + case rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_AUTOSCALER_DRAIN_PREEMPTED: + return rpc::ExportNodeData_NodeDeathInfo_Reason:: + ExportNodeData_NodeDeathInfo_Reason_AUTOSCALER_DRAIN_PREEMPTED; + case rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_AUTOSCALER_DRAIN_IDLE: + return rpc::ExportNodeData_NodeDeathInfo_Reason:: + ExportNodeData_NodeDeathInfo_Reason_AUTOSCALER_DRAIN_IDLE; + default: + // Unknown rpc::GcsNodeInfo::GcsNodeState value + RAY_LOG(FATAL) << "Invalid value for rpc::NodeDeathInfo::Reason " + << rpc::NodeDeathInfo::Reason_Name(reason); + return rpc::ExportNodeData_NodeDeathInfo_Reason:: + ExportNodeData_NodeDeathInfo_Reason_UNSPECIFIED; + } + } + /// Alive nodes. absl::flat_hash_map> alive_nodes_; /// Draining nodes. diff --git a/src/ray/gcs/gcs_server/gcs_server_main.cc b/src/ray/gcs/gcs_server/gcs_server_main.cc index b7750371632b..e2e3da236c8e 100644 --- a/src/ray/gcs/gcs_server/gcs_server_main.cc +++ b/src/ray/gcs/gcs_server/gcs_server_main.cc @@ -88,8 +88,12 @@ int main(int argc, char *argv[]) { // Initialize event framework. if (RayConfig::instance().event_log_reporter_enabled() && !log_dir.empty()) { + // This GCS server process emits GCS standard events, and Node export events + // so the various source types are passed to RayEventInit. The type of an + // event is determined by the schema of its event data. const std::vector source_types = { - ray::rpc::Event_SourceType::Event_SourceType_GCS}; + ray::rpc::Event_SourceType::Event_SourceType_GCS, + ray::rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_NODE}; ray::RayEventInit(source_types, absl::flat_hash_map(), log_dir, diff --git a/src/ray/gcs/gcs_server/test/gcs_node_manager_export_event_test.cc b/src/ray/gcs/gcs_server/test/gcs_node_manager_export_event_test.cc new file mode 100644 index 000000000000..d43d6825321d --- /dev/null +++ b/src/ray/gcs/gcs_server/test/gcs_node_manager_export_event_test.cc @@ -0,0 +1,137 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +// clang-format off +#include "gtest/gtest.h" +#include "ray/gcs/gcs_server/test/gcs_server_test_util.h" +#include "ray/gcs/test/gcs_test_util.h" +#include "ray/rpc/node_manager/node_manager_client.h" +#include "ray/rpc/node_manager/node_manager_client_pool.h" +#include "mock/ray/pubsub/publisher.h" +#include "ray/util/event.h" +// clang-format on + +namespace ray { + +std::string GenerateLogDir() { + std::string log_dir_generate = std::string(5, ' '); + FillRandom(&log_dir_generate); + std::string log_dir = "event" + StringToHex(log_dir_generate); + return log_dir; +} + +class GcsNodeManagerExportAPITest : public ::testing::Test { + public: + GcsNodeManagerExportAPITest() { + raylet_client_ = std::make_shared(); + client_pool_ = std::make_shared( + [this](const rpc::Address &) { return raylet_client_; }); + gcs_publisher_ = std::make_shared( + std::make_unique()); + gcs_table_storage_ = std::make_shared(io_service_); + + RayConfig::instance().initialize( + R"( +{ + "enable_export_api_write": true +} + )"); + log_dir_ = GenerateLogDir(); + const std::vector source_types = { + rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_NODE}; + RayEventInit_(source_types, + absl::flat_hash_map(), + log_dir_, + "warning", + false); + } + + virtual ~GcsNodeManagerExportAPITest() { + io_service_.stop(); + EventManager::Instance().ClearReporters(); + std::filesystem::remove_all(log_dir_.c_str()); + } + + protected: + std::shared_ptr gcs_table_storage_; + std::shared_ptr raylet_client_; + std::shared_ptr client_pool_; + std::shared_ptr gcs_publisher_; + instrumented_io_context io_service_; + std::string log_dir_; +}; + +TEST_F(GcsNodeManagerExportAPITest, TestExportEventRegisterNode) { + // Test export event is written when a node is added with HandleRegisterNode + gcs::GcsNodeManager node_manager( + gcs_publisher_, gcs_table_storage_, client_pool_, ClusterID::Nil()); + auto node = Mocker::GenNodeInfo(); + + rpc::RegisterNodeRequest register_request; + register_request.mutable_node_info()->CopyFrom(*node); + rpc::RegisterNodeReply register_reply; + auto send_reply_callback = + [](ray::Status status, std::function f1, std::function f2) {}; + + node_manager.HandleRegisterNode(register_request, ®ister_reply, send_reply_callback); + io_service_.poll(); + + std::vector vc; + Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_NODE.log"); + ASSERT_EQ((int)vc.size(), 1); + json event_data = json::parse(vc[0])["event_data"].get(); + ASSERT_EQ(event_data["state"], "ALIVE"); +} + +TEST_F(GcsNodeManagerExportAPITest, TestExportEventUnregisterNode) { + // Test export event is written when a node is removed with HandleUnregisterNode + gcs::GcsNodeManager node_manager( + gcs_publisher_, gcs_table_storage_, client_pool_, ClusterID::Nil()); + auto node = Mocker::GenNodeInfo(); + auto node_id = NodeID::FromBinary(node->node_id()); + node_manager.AddNode(node); + + rpc::UnregisterNodeRequest unregister_request; + unregister_request.set_node_id(node_id.Binary()); + unregister_request.mutable_node_death_info()->set_reason( + rpc::NodeDeathInfo::UNEXPECTED_TERMINATION); + unregister_request.mutable_node_death_info()->set_reason_message("mock reason message"); + rpc::UnregisterNodeReply unregister_reply; + auto send_reply_callback = + [](ray::Status status, std::function f1, std::function f2) {}; + + node_manager.HandleUnregisterNode( + unregister_request, &unregister_reply, send_reply_callback); + io_service_.poll(); + + std::vector vc; + Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_NODE.log"); + ASSERT_EQ((int)vc.size(), 1); + json event_data = json::parse(vc[0])["event_data"].get(); + ASSERT_EQ(event_data["state"], "DEAD"); + // Verify death cause for last node DEAD event + ASSERT_EQ(event_data["death_info"]["reason"], "UNEXPECTED_TERMINATION"); + ASSERT_EQ(event_data["death_info"]["reason_message"], "mock reason message"); +} + +} // namespace ray + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file diff --git a/src/ray/gcs/test/gcs_test_util.h b/src/ray/gcs/test/gcs_test_util.h index 81ec8d11ebbf..72565ba07564 100644 --- a/src/ray/gcs/test/gcs_test_util.h +++ b/src/ray/gcs/test/gcs_test_util.h @@ -14,6 +14,8 @@ #pragma once +#include +#include #include #include @@ -426,6 +428,16 @@ struct Mocker { } return constraint; } + // Read all lines of a file into vector vc + static void ReadContentFromFile(std::vector &vc, std::string log_file) { + std::string line; + std::ifstream read_file; + read_file.open(log_file, std::ios::binary); + while (std::getline(read_file, line)) { + vc.push_back(line); + } + read_file.close(); + } }; } // namespace ray diff --git a/src/ray/util/event.cc b/src/ray/util/event.cc index 1a4709c3c1cd..c49a42d30a20 100644 --- a/src/ray/util/event.cc +++ b/src/ray/util/event.cc @@ -130,10 +130,16 @@ std::string LogEventReporter::ExportEventToString(const rpc::ExportEvent &export std::string event_data_as_string; google::protobuf::util::JsonPrintOptions options; options.preserve_proto_field_names = true; + // Required so enum with value 0 is not omitted + options.always_print_primitive_fields = true; if (export_event.has_task_event_data()) { RAY_CHECK(google::protobuf::util::MessageToJsonString( export_event.task_event_data(), &event_data_as_string, options) .ok()); + } else if (export_event.has_node_event_data()) { + RAY_CHECK(google::protobuf::util::MessageToJsonString( + export_event.node_event_data(), &event_data_as_string, options) + .ok()); } else { RAY_LOG(FATAL) << "event_data missing from export event with id " << export_event.event_id() @@ -180,7 +186,9 @@ EventManager &EventManager::Instance() { return instance_; } -bool EventManager::IsEmpty() { return reporter_map_.empty(); } +bool EventManager::IsEmpty() { + return reporter_map_.empty() && export_log_reporter_map_.empty(); +} void EventManager::Publish(const rpc::Event &event, const json &custom_fields) { for (const auto &element : reporter_map_) {