From 75fb9f8c61e46748b6af940c64e0fece7bb3205e Mon Sep 17 00:00:00 2001 From: Cuong Nguyen <128072568+can-anyscale@users.noreply.github.com> Date: Thu, 29 Aug 2024 10:45:19 -0700 Subject: [PATCH] Revert "[observability][export-api] Write node events" (#47405) Reverts ray-project/ray#47221 This broke https://github.com/ray-project/ray/issues/47395 Signed-off-by: ujjawal-khare --- src/ray/gcs/gcs_server/gcs_node_manager.cc | 35 +----- src/ray/gcs/gcs_server/gcs_node_manager.h | 45 -------- .../gcs_server/test/gcs_node_manager_test.cc | 105 ------------------ src/ray/gcs/test/gcs_test_util.h | 12 -- 4 files changed, 1 insertion(+), 196 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 0d91c06439c2..197536e1f2be 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -39,36 +39,6 @@ GcsNodeManager::GcsNodeManager( raylet_client_pool_(std::move(raylet_client_pool)), cluster_id_(cluster_id) {} -void GcsNodeManager::WriteNodeExportEvent(rpc::GcsNodeInfo node_info) const { - /// Write node_info as a export node event if - /// enable_export_api_write() is enabled. - if (!RayConfig::instance().enable_export_api_write()) { - return; - } - std::shared_ptr export_node_data_ptr = - std::make_shared(); - export_node_data_ptr->set_node_id(node_info.node_id()); - export_node_data_ptr->set_node_manager_address(node_info.node_manager_address()); - export_node_data_ptr->mutable_resources_total()->insert( - node_info.resources_total().begin(), node_info.resources_total().end()); - export_node_data_ptr->set_node_name(node_info.node_name()); - export_node_data_ptr->set_start_time_ms(node_info.start_time_ms()); - export_node_data_ptr->set_end_time_ms(node_info.end_time_ms()); - export_node_data_ptr->set_is_head_node(node_info.is_head_node()); - export_node_data_ptr->mutable_labels()->insert(node_info.labels().begin(), - node_info.labels().end()); - export_node_data_ptr->set_state(ConvertGCSNodeStateToExport(node_info.state())); - if (!node_info.death_info().reason_message().empty() || - node_info.death_info().reason() != - rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_UNSPECIFIED) { - export_node_data_ptr->mutable_death_info()->set_reason_message( - node_info.death_info().reason_message()); - export_node_data_ptr->mutable_death_info()->set_reason( - ConvertNodeDeathReasonToExport(node_info.death_info().reason())); - } - RayExportEvent(export_node_data_ptr).SendEvent(); -} - // Note: ServerCall will populate the cluster_id. void GcsNodeManager::HandleGetClusterId(rpc::GetClusterIdRequest request, rpc::GetClusterIdReply *reply, @@ -96,7 +66,6 @@ void GcsNodeManager::HandleRegisterNode(rpc::RegisterNodeRequest request, AddNode(std::make_shared(request.node_info())); WriteNodeExportEvent(request.node_info()); GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); - WriteNodeExportEvent(request.node_info()); }; if (request.node_info().is_head_node()) { // mark all old head nodes as dead if exists: @@ -165,7 +134,6 @@ void GcsNodeManager::HandleUnregisterNode(rpc::UnregisterNodeRequest request, auto on_put_done = [=](const Status &status) { RAY_CHECK_OK(gcs_publisher_->PublishNodeInfo(node_id, *node_info_delta, nullptr)); - WriteNodeExportEvent(*node); }; RAY_CHECK_OK(gcs_table_storage_->NodeTable().Put(node_id, *node, on_put_done)); GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); @@ -422,14 +390,13 @@ void GcsNodeManager::OnNodeFailure(const NodeID &node_id, node_info_delta->set_end_time_ms(node->end_time_ms()); node_info_delta->mutable_death_info()->CopyFrom(node->death_info()); - auto on_done = [this, node_id, node_table_updated_callback, node_info_delta, node]( + auto on_done = [this, node_id, node_table_updated_callback, node_info_delta]( const Status &status) { WriteNodeExportEvent(*node); if (node_table_updated_callback != nullptr) { node_table_updated_callback(Status::OK()); } RAY_CHECK_OK(gcs_publisher_->PublishNodeInfo(node_id, *node_info_delta, nullptr)); - WriteNodeExportEvent(*node); }; RAY_CHECK_OK(gcs_table_storage_->NodeTable().Put(node_id, *node, on_done)); } else if (node_table_updated_callback != nullptr) { diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index db258d4cb00c..351e0c4973e7 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -31,7 +31,6 @@ #include "ray/rpc/gcs_server/gcs_rpc_server.h" #include "ray/rpc/node_manager/node_manager_client.h" #include "ray/rpc/node_manager/node_manager_client_pool.h" -#include "ray/util/event.h" #include "src/ray/protobuf/gcs.pb.h" namespace ray { @@ -180,50 +179,6 @@ class GcsNodeManager : public rpc::NodeInfoHandler { /// \return The inferred death info of the node. rpc::NodeDeathInfo InferDeathInfo(const NodeID &node_id); - void WriteNodeExportEvent(rpc::GcsNodeInfo node_info) const; - - rpc::ExportNodeData::GcsNodeState ConvertGCSNodeStateToExport( - rpc::GcsNodeInfo::GcsNodeState node_state) const { - switch (node_state) { - case rpc::GcsNodeInfo_GcsNodeState::GcsNodeInfo_GcsNodeState_ALIVE: - return rpc::ExportNodeData_GcsNodeState::ExportNodeData_GcsNodeState_ALIVE; - case rpc::GcsNodeInfo_GcsNodeState::GcsNodeInfo_GcsNodeState_DEAD: - return rpc::ExportNodeData_GcsNodeState::ExportNodeData_GcsNodeState_DEAD; - default: - // Unknown rpc::GcsNodeInfo::GcsNodeState value - RAY_LOG(FATAL) << "Invalid value for rpc::GcsNodeInfo::GcsNodeState " - << rpc::GcsNodeInfo::GcsNodeState_Name(node_state); - return rpc::ExportNodeData_GcsNodeState::ExportNodeData_GcsNodeState_DEAD; - } - } - - rpc::ExportNodeData::NodeDeathInfo::Reason ConvertNodeDeathReasonToExport( - rpc::NodeDeathInfo::Reason reason) const { - switch (reason) { - case rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_UNSPECIFIED: - return rpc::ExportNodeData_NodeDeathInfo_Reason:: - ExportNodeData_NodeDeathInfo_Reason_UNSPECIFIED; - case rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_EXPECTED_TERMINATION: - return rpc::ExportNodeData_NodeDeathInfo_Reason:: - ExportNodeData_NodeDeathInfo_Reason_EXPECTED_TERMINATION; - case rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_UNEXPECTED_TERMINATION: - return rpc::ExportNodeData_NodeDeathInfo_Reason:: - ExportNodeData_NodeDeathInfo_Reason_UNEXPECTED_TERMINATION; - case rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_AUTOSCALER_DRAIN_PREEMPTED: - return rpc::ExportNodeData_NodeDeathInfo_Reason:: - ExportNodeData_NodeDeathInfo_Reason_AUTOSCALER_DRAIN_PREEMPTED; - case rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_AUTOSCALER_DRAIN_IDLE: - return rpc::ExportNodeData_NodeDeathInfo_Reason:: - ExportNodeData_NodeDeathInfo_Reason_AUTOSCALER_DRAIN_IDLE; - default: - // Unknown rpc::GcsNodeInfo::GcsNodeState value - RAY_LOG(FATAL) << "Invalid value for rpc::NodeDeathInfo::Reason " - << rpc::NodeDeathInfo::Reason_Name(reason); - return rpc::ExportNodeData_NodeDeathInfo_Reason:: - ExportNodeData_NodeDeathInfo_Reason_UNSPECIFIED; - } - } - /// Alive nodes. absl::flat_hash_map> alive_nodes_; /// Draining nodes. diff --git a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc index e13a347e8007..a8a0157e0d54 100644 --- a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc @@ -12,9 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include -#include // clang-format off #include "gtest/gtest.h" @@ -23,11 +21,9 @@ #include "ray/rpc/node_manager/node_manager_client.h" #include "ray/rpc/node_manager/node_manager_client_pool.h" #include "mock/ray/pubsub/publisher.h" -#include "ray/util/event.h" // clang-format on namespace ray { - class GcsNodeManagerTest : public ::testing::Test { public: GcsNodeManagerTest() { @@ -45,54 +41,6 @@ class GcsNodeManagerTest : public ::testing::Test { std::shared_ptr gcs_publisher_; }; -std::string GenerateLogDir() { - std::string log_dir_generate = std::string(5, ' '); - FillRandom(&log_dir_generate); - std::string log_dir = "event" + StringToHex(log_dir_generate); - return log_dir; -} - -class GcsNodeManagerExportAPITest : public ::testing::Test { - public: - GcsNodeManagerExportAPITest() { - raylet_client_ = std::make_shared(); - client_pool_ = std::make_shared( - [this](const rpc::Address &) { return raylet_client_; }); - gcs_publisher_ = std::make_shared( - std::make_unique()); - gcs_table_storage_ = std::make_shared(io_service_); - - RayConfig::instance().initialize( - R"( -{ - "enable_export_api_write": true -} - )"); - log_dir_ = GenerateLogDir(); - const std::vector source_types = { - rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_NODE}; - RayEventInit_(source_types, - absl::flat_hash_map(), - log_dir_, - "warning", - false); - } - - virtual ~GcsNodeManagerExportAPITest() { - io_service_.stop(); - EventManager::Instance().ClearReporters(); - std::filesystem::remove_all(log_dir_.c_str()); - } - - protected: - std::shared_ptr gcs_table_storage_; - std::shared_ptr raylet_client_; - std::shared_ptr client_pool_; - std::shared_ptr gcs_publisher_; - instrumented_io_context io_service_; - std::string log_dir_; -}; - TEST_F(GcsNodeManagerTest, TestManagement) { gcs::GcsNodeManager node_manager( gcs_publisher_, gcs_table_storage_, client_pool_, ClusterID::Nil()); @@ -148,59 +96,6 @@ TEST_F(GcsNodeManagerTest, TestListener) { } } -TEST_F(GcsNodeManagerExportAPITest, TestExportEventRegisterNode) { - // Test export event is written when a node is added with HandleRegisterNode - gcs::GcsNodeManager node_manager( - gcs_publisher_, gcs_table_storage_, client_pool_, ClusterID::Nil()); - auto node = Mocker::GenNodeInfo(); - - rpc::RegisterNodeRequest register_request; - register_request.mutable_node_info()->CopyFrom(*node); - rpc::RegisterNodeReply register_reply; - auto send_reply_callback = - [](ray::Status status, std::function f1, std::function f2) {}; - - node_manager.HandleRegisterNode(register_request, ®ister_reply, send_reply_callback); - io_service_.poll(); - - std::vector vc; - Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_NODE.log"); - ASSERT_EQ((int)vc.size(), 1); - json event_data = json::parse(vc[0])["event_data"].get(); - ASSERT_EQ(event_data["state"], "ALIVE"); -} - -TEST_F(GcsNodeManagerExportAPITest, TestExportEventUnregisterNode) { - // Test export event is written when a node is removed with HandleUnregisterNode - gcs::GcsNodeManager node_manager( - gcs_publisher_, gcs_table_storage_, client_pool_, ClusterID::Nil()); - auto node = Mocker::GenNodeInfo(); - auto node_id = NodeID::FromBinary(node->node_id()); - node_manager.AddNode(node); - - rpc::UnregisterNodeRequest unregister_request; - unregister_request.set_node_id(node_id.Binary()); - unregister_request.mutable_node_death_info()->set_reason( - rpc::NodeDeathInfo::UNEXPECTED_TERMINATION); - unregister_request.mutable_node_death_info()->set_reason_message("mock reason message"); - rpc::UnregisterNodeReply unregister_reply; - auto send_reply_callback = - [](ray::Status status, std::function f1, std::function f2) {}; - - node_manager.HandleUnregisterNode( - unregister_request, &unregister_reply, send_reply_callback); - io_service_.poll(); - - std::vector vc; - Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_NODE.log"); - ASSERT_EQ((int)vc.size(), 1); - json event_data = json::parse(vc[0])["event_data"].get(); - ASSERT_EQ(event_data["state"], "DEAD"); - // Verify death cause for last node DEAD event - ASSERT_EQ(event_data["death_info"]["reason"], "UNEXPECTED_TERMINATION"); - ASSERT_EQ(event_data["death_info"]["reason_message"], "mock reason message"); -} - } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/gcs/test/gcs_test_util.h b/src/ray/gcs/test/gcs_test_util.h index 72565ba07564..81ec8d11ebbf 100644 --- a/src/ray/gcs/test/gcs_test_util.h +++ b/src/ray/gcs/test/gcs_test_util.h @@ -14,8 +14,6 @@ #pragma once -#include -#include #include #include @@ -428,16 +426,6 @@ struct Mocker { } return constraint; } - // Read all lines of a file into vector vc - static void ReadContentFromFile(std::vector &vc, std::string log_file) { - std::string line; - std::ifstream read_file; - read_file.open(log_file, std::ios::binary); - while (std::getline(read_file, line)) { - vc.push_back(line); - } - read_file.close(); - } }; } // namespace ray