From b526079f01c11ed4d0313ac7be7c66349b36e36e Mon Sep 17 00:00:00 2001 From: Cuong Nguyen <128072568+can-anyscale@users.noreply.github.com> Date: Thu, 29 Aug 2024 09:43:55 -0700 Subject: [PATCH] Revert "[observability][export-api] Write node events (#47221)" This reverts commit d425f31c3489c9f0038262cff45d025c405d9ea5. --- src/ray/gcs/gcs_server/gcs_node_manager.cc | 35 +----- src/ray/gcs/gcs_server/gcs_node_manager.h | 45 -------- src/ray/gcs/gcs_server/gcs_server_main.cc | 6 +- .../gcs_server/test/gcs_node_manager_test.cc | 105 ------------------ src/ray/gcs/test/gcs_test_util.h | 12 -- src/ray/util/event.cc | 10 +- 6 files changed, 3 insertions(+), 210 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 4386304a8022..fe55fb3944d2 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -39,36 +39,6 @@ GcsNodeManager::GcsNodeManager( raylet_client_pool_(std::move(raylet_client_pool)), cluster_id_(cluster_id) {} -void GcsNodeManager::WriteNodeExportEvent(rpc::GcsNodeInfo node_info) const { - /// Write node_info as a export node event if - /// enable_export_api_write() is enabled. - if (!RayConfig::instance().enable_export_api_write()) { - return; - } - std::shared_ptr export_node_data_ptr = - std::make_shared(); - export_node_data_ptr->set_node_id(node_info.node_id()); - export_node_data_ptr->set_node_manager_address(node_info.node_manager_address()); - export_node_data_ptr->mutable_resources_total()->insert( - node_info.resources_total().begin(), node_info.resources_total().end()); - export_node_data_ptr->set_node_name(node_info.node_name()); - export_node_data_ptr->set_start_time_ms(node_info.start_time_ms()); - export_node_data_ptr->set_end_time_ms(node_info.end_time_ms()); - export_node_data_ptr->set_is_head_node(node_info.is_head_node()); - export_node_data_ptr->mutable_labels()->insert(node_info.labels().begin(), - node_info.labels().end()); - export_node_data_ptr->set_state(ConvertGCSNodeStateToExport(node_info.state())); - if (!node_info.death_info().reason_message().empty() || - node_info.death_info().reason() != - rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_UNSPECIFIED) { - export_node_data_ptr->mutable_death_info()->set_reason_message( - node_info.death_info().reason_message()); - export_node_data_ptr->mutable_death_info()->set_reason( - ConvertNodeDeathReasonToExport(node_info.death_info().reason())); - } - RayExportEvent(export_node_data_ptr).SendEvent(); -} - // Note: ServerCall will populate the cluster_id. void GcsNodeManager::HandleGetClusterId(rpc::GetClusterIdRequest request, rpc::GetClusterIdReply *reply, @@ -94,7 +64,6 @@ void GcsNodeManager::HandleRegisterNode(rpc::RegisterNodeRequest request, RAY_CHECK_OK(gcs_publisher_->PublishNodeInfo(node_id, request.node_info(), nullptr)); AddNode(std::make_shared(request.node_info())); GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); - WriteNodeExportEvent(request.node_info()); }; if (request.node_info().is_head_node()) { // mark all old head nodes as dead if exists: @@ -163,7 +132,6 @@ void GcsNodeManager::HandleUnregisterNode(rpc::UnregisterNodeRequest request, auto on_put_done = [=](const Status &status) { RAY_CHECK_OK(gcs_publisher_->PublishNodeInfo(node_id, *node_info_delta, nullptr)); - WriteNodeExportEvent(*node); }; RAY_CHECK_OK(gcs_table_storage_->NodeTable().Put(node_id, *node, on_put_done)); GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); @@ -435,13 +403,12 @@ void GcsNodeManager::OnNodeFailure(const NodeID &node_id, node_info_delta->set_end_time_ms(node->end_time_ms()); node_info_delta->mutable_death_info()->CopyFrom(node->death_info()); - auto on_done = [this, node_id, node_table_updated_callback, node_info_delta, node]( + auto on_done = [this, node_id, node_table_updated_callback, node_info_delta]( const Status &status) { if (node_table_updated_callback != nullptr) { node_table_updated_callback(Status::OK()); } RAY_CHECK_OK(gcs_publisher_->PublishNodeInfo(node_id, *node_info_delta, nullptr)); - WriteNodeExportEvent(*node); }; RAY_CHECK_OK(gcs_table_storage_->NodeTable().Put(node_id, *node, on_done)); } else if (node_table_updated_callback != nullptr) { diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index 7db037e767e5..edefa416cda9 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -31,7 +31,6 @@ #include "ray/rpc/gcs_server/gcs_rpc_server.h" #include "ray/rpc/node_manager/node_manager_client.h" #include "ray/rpc/node_manager/node_manager_client_pool.h" -#include "ray/util/event.h" #include "src/ray/protobuf/gcs.pb.h" namespace ray { @@ -185,50 +184,6 @@ class GcsNodeManager : public rpc::NodeInfoHandler { /// \return The inferred death info of the node. rpc::NodeDeathInfo InferDeathInfo(const NodeID &node_id); - void WriteNodeExportEvent(rpc::GcsNodeInfo node_info) const; - - rpc::ExportNodeData::GcsNodeState ConvertGCSNodeStateToExport( - rpc::GcsNodeInfo::GcsNodeState node_state) const { - switch (node_state) { - case rpc::GcsNodeInfo_GcsNodeState::GcsNodeInfo_GcsNodeState_ALIVE: - return rpc::ExportNodeData_GcsNodeState::ExportNodeData_GcsNodeState_ALIVE; - case rpc::GcsNodeInfo_GcsNodeState::GcsNodeInfo_GcsNodeState_DEAD: - return rpc::ExportNodeData_GcsNodeState::ExportNodeData_GcsNodeState_DEAD; - default: - // Unknown rpc::GcsNodeInfo::GcsNodeState value - RAY_LOG(FATAL) << "Invalid value for rpc::GcsNodeInfo::GcsNodeState " - << rpc::GcsNodeInfo::GcsNodeState_Name(node_state); - return rpc::ExportNodeData_GcsNodeState::ExportNodeData_GcsNodeState_DEAD; - } - } - - rpc::ExportNodeData::NodeDeathInfo::Reason ConvertNodeDeathReasonToExport( - rpc::NodeDeathInfo::Reason reason) const { - switch (reason) { - case rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_UNSPECIFIED: - return rpc::ExportNodeData_NodeDeathInfo_Reason:: - ExportNodeData_NodeDeathInfo_Reason_UNSPECIFIED; - case rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_EXPECTED_TERMINATION: - return rpc::ExportNodeData_NodeDeathInfo_Reason:: - ExportNodeData_NodeDeathInfo_Reason_EXPECTED_TERMINATION; - case rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_UNEXPECTED_TERMINATION: - return rpc::ExportNodeData_NodeDeathInfo_Reason:: - ExportNodeData_NodeDeathInfo_Reason_UNEXPECTED_TERMINATION; - case rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_AUTOSCALER_DRAIN_PREEMPTED: - return rpc::ExportNodeData_NodeDeathInfo_Reason:: - ExportNodeData_NodeDeathInfo_Reason_AUTOSCALER_DRAIN_PREEMPTED; - case rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_AUTOSCALER_DRAIN_IDLE: - return rpc::ExportNodeData_NodeDeathInfo_Reason:: - ExportNodeData_NodeDeathInfo_Reason_AUTOSCALER_DRAIN_IDLE; - default: - // Unknown rpc::GcsNodeInfo::GcsNodeState value - RAY_LOG(FATAL) << "Invalid value for rpc::NodeDeathInfo::Reason " - << rpc::NodeDeathInfo::Reason_Name(reason); - return rpc::ExportNodeData_NodeDeathInfo_Reason:: - ExportNodeData_NodeDeathInfo_Reason_UNSPECIFIED; - } - } - /// Alive nodes. absl::flat_hash_map> alive_nodes_; /// Draining nodes. diff --git a/src/ray/gcs/gcs_server/gcs_server_main.cc b/src/ray/gcs/gcs_server/gcs_server_main.cc index e2e3da236c8e..b7750371632b 100644 --- a/src/ray/gcs/gcs_server/gcs_server_main.cc +++ b/src/ray/gcs/gcs_server/gcs_server_main.cc @@ -88,12 +88,8 @@ int main(int argc, char *argv[]) { // Initialize event framework. if (RayConfig::instance().event_log_reporter_enabled() && !log_dir.empty()) { - // This GCS server process emits GCS standard events, and Node export events - // so the various source types are passed to RayEventInit. The type of an - // event is determined by the schema of its event data. const std::vector source_types = { - ray::rpc::Event_SourceType::Event_SourceType_GCS, - ray::rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_NODE}; + ray::rpc::Event_SourceType::Event_SourceType_GCS}; ray::RayEventInit(source_types, absl::flat_hash_map(), log_dir, diff --git a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc index e13a347e8007..a8a0157e0d54 100644 --- a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc @@ -12,9 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include -#include // clang-format off #include "gtest/gtest.h" @@ -23,11 +21,9 @@ #include "ray/rpc/node_manager/node_manager_client.h" #include "ray/rpc/node_manager/node_manager_client_pool.h" #include "mock/ray/pubsub/publisher.h" -#include "ray/util/event.h" // clang-format on namespace ray { - class GcsNodeManagerTest : public ::testing::Test { public: GcsNodeManagerTest() { @@ -45,54 +41,6 @@ class GcsNodeManagerTest : public ::testing::Test { std::shared_ptr gcs_publisher_; }; -std::string GenerateLogDir() { - std::string log_dir_generate = std::string(5, ' '); - FillRandom(&log_dir_generate); - std::string log_dir = "event" + StringToHex(log_dir_generate); - return log_dir; -} - -class GcsNodeManagerExportAPITest : public ::testing::Test { - public: - GcsNodeManagerExportAPITest() { - raylet_client_ = std::make_shared(); - client_pool_ = std::make_shared( - [this](const rpc::Address &) { return raylet_client_; }); - gcs_publisher_ = std::make_shared( - std::make_unique()); - gcs_table_storage_ = std::make_shared(io_service_); - - RayConfig::instance().initialize( - R"( -{ - "enable_export_api_write": true -} - )"); - log_dir_ = GenerateLogDir(); - const std::vector source_types = { - rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_NODE}; - RayEventInit_(source_types, - absl::flat_hash_map(), - log_dir_, - "warning", - false); - } - - virtual ~GcsNodeManagerExportAPITest() { - io_service_.stop(); - EventManager::Instance().ClearReporters(); - std::filesystem::remove_all(log_dir_.c_str()); - } - - protected: - std::shared_ptr gcs_table_storage_; - std::shared_ptr raylet_client_; - std::shared_ptr client_pool_; - std::shared_ptr gcs_publisher_; - instrumented_io_context io_service_; - std::string log_dir_; -}; - TEST_F(GcsNodeManagerTest, TestManagement) { gcs::GcsNodeManager node_manager( gcs_publisher_, gcs_table_storage_, client_pool_, ClusterID::Nil()); @@ -148,59 +96,6 @@ TEST_F(GcsNodeManagerTest, TestListener) { } } -TEST_F(GcsNodeManagerExportAPITest, TestExportEventRegisterNode) { - // Test export event is written when a node is added with HandleRegisterNode - gcs::GcsNodeManager node_manager( - gcs_publisher_, gcs_table_storage_, client_pool_, ClusterID::Nil()); - auto node = Mocker::GenNodeInfo(); - - rpc::RegisterNodeRequest register_request; - register_request.mutable_node_info()->CopyFrom(*node); - rpc::RegisterNodeReply register_reply; - auto send_reply_callback = - [](ray::Status status, std::function f1, std::function f2) {}; - - node_manager.HandleRegisterNode(register_request, ®ister_reply, send_reply_callback); - io_service_.poll(); - - std::vector vc; - Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_NODE.log"); - ASSERT_EQ((int)vc.size(), 1); - json event_data = json::parse(vc[0])["event_data"].get(); - ASSERT_EQ(event_data["state"], "ALIVE"); -} - -TEST_F(GcsNodeManagerExportAPITest, TestExportEventUnregisterNode) { - // Test export event is written when a node is removed with HandleUnregisterNode - gcs::GcsNodeManager node_manager( - gcs_publisher_, gcs_table_storage_, client_pool_, ClusterID::Nil()); - auto node = Mocker::GenNodeInfo(); - auto node_id = NodeID::FromBinary(node->node_id()); - node_manager.AddNode(node); - - rpc::UnregisterNodeRequest unregister_request; - unregister_request.set_node_id(node_id.Binary()); - unregister_request.mutable_node_death_info()->set_reason( - rpc::NodeDeathInfo::UNEXPECTED_TERMINATION); - unregister_request.mutable_node_death_info()->set_reason_message("mock reason message"); - rpc::UnregisterNodeReply unregister_reply; - auto send_reply_callback = - [](ray::Status status, std::function f1, std::function f2) {}; - - node_manager.HandleUnregisterNode( - unregister_request, &unregister_reply, send_reply_callback); - io_service_.poll(); - - std::vector vc; - Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_NODE.log"); - ASSERT_EQ((int)vc.size(), 1); - json event_data = json::parse(vc[0])["event_data"].get(); - ASSERT_EQ(event_data["state"], "DEAD"); - // Verify death cause for last node DEAD event - ASSERT_EQ(event_data["death_info"]["reason"], "UNEXPECTED_TERMINATION"); - ASSERT_EQ(event_data["death_info"]["reason_message"], "mock reason message"); -} - } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/gcs/test/gcs_test_util.h b/src/ray/gcs/test/gcs_test_util.h index 72565ba07564..81ec8d11ebbf 100644 --- a/src/ray/gcs/test/gcs_test_util.h +++ b/src/ray/gcs/test/gcs_test_util.h @@ -14,8 +14,6 @@ #pragma once -#include -#include #include #include @@ -428,16 +426,6 @@ struct Mocker { } return constraint; } - // Read all lines of a file into vector vc - static void ReadContentFromFile(std::vector &vc, std::string log_file) { - std::string line; - std::ifstream read_file; - read_file.open(log_file, std::ios::binary); - while (std::getline(read_file, line)) { - vc.push_back(line); - } - read_file.close(); - } }; } // namespace ray diff --git a/src/ray/util/event.cc b/src/ray/util/event.cc index c49a42d30a20..1a4709c3c1cd 100644 --- a/src/ray/util/event.cc +++ b/src/ray/util/event.cc @@ -130,16 +130,10 @@ std::string LogEventReporter::ExportEventToString(const rpc::ExportEvent &export std::string event_data_as_string; google::protobuf::util::JsonPrintOptions options; options.preserve_proto_field_names = true; - // Required so enum with value 0 is not omitted - options.always_print_primitive_fields = true; if (export_event.has_task_event_data()) { RAY_CHECK(google::protobuf::util::MessageToJsonString( export_event.task_event_data(), &event_data_as_string, options) .ok()); - } else if (export_event.has_node_event_data()) { - RAY_CHECK(google::protobuf::util::MessageToJsonString( - export_event.node_event_data(), &event_data_as_string, options) - .ok()); } else { RAY_LOG(FATAL) << "event_data missing from export event with id " << export_event.event_id() @@ -186,9 +180,7 @@ EventManager &EventManager::Instance() { return instance_; } -bool EventManager::IsEmpty() { - return reporter_map_.empty() && export_log_reporter_map_.empty(); -} +bool EventManager::IsEmpty() { return reporter_map_.empty(); } void EventManager::Publish(const rpc::Event &event, const json &custom_fields) { for (const auto &element : reporter_map_) {