Skip to content

Commit

Permalink
[observability][export-api] Write node events (ray-project#47422)
Browse files Browse the repository at this point in the history
Same code changes as [observability][export-api] Write node events ray-project#47221
Move test into a separate file to create a separate bazel target that can be skipped on Windows

Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
  • Loading branch information
nikitavemuri authored and ujjawal-khare committed Oct 15, 2024
1 parent d61c5b0 commit 4b64abb
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 3 deletions.
19 changes: 19 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1654,6 +1654,25 @@ ray_cc_test(
],
)

ray_cc_test(
name = "gcs_node_manager_export_event_test",
size = "small",
srcs = [
"src/ray/gcs/gcs_server/test/gcs_node_manager_export_event_test.cc",
],
tags = [
"no_windows",
"team:core"
],
deps = [
":gcs_server_lib",
":gcs_server_test_util",
":gcs_test_util_lib",
":ray_mock",
"@com_google_googletest//:gtest_main",
],
)

ray_cc_test(
name = "gcs_job_manager_test",
size = "small",
Expand Down
33 changes: 32 additions & 1 deletion src/ray/gcs/gcs_server/gcs_node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,36 @@ GcsNodeManager::GcsNodeManager(
raylet_client_pool_(std::move(raylet_client_pool)),
cluster_id_(cluster_id) {}

void GcsNodeManager::WriteNodeExportEvent(rpc::GcsNodeInfo node_info) const {
/// Write node_info as a export node event if
/// enable_export_api_write() is enabled.
if (!RayConfig::instance().enable_export_api_write()) {
return;
}
std::shared_ptr<rpc::ExportNodeData> export_node_data_ptr =
std::make_shared<rpc::ExportNodeData>();
export_node_data_ptr->set_node_id(node_info.node_id());
export_node_data_ptr->set_node_manager_address(node_info.node_manager_address());
export_node_data_ptr->mutable_resources_total()->insert(
node_info.resources_total().begin(), node_info.resources_total().end());
export_node_data_ptr->set_node_name(node_info.node_name());
export_node_data_ptr->set_start_time_ms(node_info.start_time_ms());
export_node_data_ptr->set_end_time_ms(node_info.end_time_ms());
export_node_data_ptr->set_is_head_node(node_info.is_head_node());
export_node_data_ptr->mutable_labels()->insert(node_info.labels().begin(),
node_info.labels().end());
export_node_data_ptr->set_state(ConvertGCSNodeStateToExport(node_info.state()));
if (!node_info.death_info().reason_message().empty() ||
node_info.death_info().reason() !=
rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_UNSPECIFIED) {
export_node_data_ptr->mutable_death_info()->set_reason_message(
node_info.death_info().reason_message());
export_node_data_ptr->mutable_death_info()->set_reason(
ConvertNodeDeathReasonToExport(node_info.death_info().reason()));
}
RayExportEvent(export_node_data_ptr).SendEvent();
}

// Note: ServerCall will populate the cluster_id.
void GcsNodeManager::HandleGetClusterId(rpc::GetClusterIdRequest request,
rpc::GetClusterIdReply *reply,
Expand Down Expand Up @@ -134,6 +164,7 @@ void GcsNodeManager::HandleUnregisterNode(rpc::UnregisterNodeRequest request,

auto on_put_done = [=](const Status &status) {
RAY_CHECK_OK(gcs_publisher_->PublishNodeInfo(node_id, *node_info_delta, nullptr));
WriteNodeExportEvent(*node);
};
RAY_CHECK_OK(gcs_table_storage_->NodeTable().Put(node_id, *node, on_put_done));
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
Expand Down Expand Up @@ -390,7 +421,7 @@ void GcsNodeManager::OnNodeFailure(const NodeID &node_id,
node_info_delta->set_end_time_ms(node->end_time_ms());
node_info_delta->mutable_death_info()->CopyFrom(node->death_info());

auto on_done = [this, node_id, node_table_updated_callback, node_info_delta](
auto on_done = [this, node_id, node_table_updated_callback, node_info_delta, node](
const Status &status) {
WriteNodeExportEvent(*node);
if (node_table_updated_callback != nullptr) {
Expand Down
45 changes: 45 additions & 0 deletions src/ray/gcs/gcs_server/gcs_node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "ray/rpc/gcs_server/gcs_rpc_server.h"
#include "ray/rpc/node_manager/node_manager_client.h"
#include "ray/rpc/node_manager/node_manager_client_pool.h"
#include "ray/util/event.h"
#include "src/ray/protobuf/gcs.pb.h"

namespace ray {
Expand Down Expand Up @@ -179,6 +180,50 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
/// \return The inferred death info of the node.
rpc::NodeDeathInfo InferDeathInfo(const NodeID &node_id);

void WriteNodeExportEvent(rpc::GcsNodeInfo node_info) const;

rpc::ExportNodeData::GcsNodeState ConvertGCSNodeStateToExport(
rpc::GcsNodeInfo::GcsNodeState node_state) const {
switch (node_state) {
case rpc::GcsNodeInfo_GcsNodeState::GcsNodeInfo_GcsNodeState_ALIVE:
return rpc::ExportNodeData_GcsNodeState::ExportNodeData_GcsNodeState_ALIVE;
case rpc::GcsNodeInfo_GcsNodeState::GcsNodeInfo_GcsNodeState_DEAD:
return rpc::ExportNodeData_GcsNodeState::ExportNodeData_GcsNodeState_DEAD;
default:
// Unknown rpc::GcsNodeInfo::GcsNodeState value
RAY_LOG(FATAL) << "Invalid value for rpc::GcsNodeInfo::GcsNodeState "
<< rpc::GcsNodeInfo::GcsNodeState_Name(node_state);
return rpc::ExportNodeData_GcsNodeState::ExportNodeData_GcsNodeState_DEAD;
}
}

rpc::ExportNodeData::NodeDeathInfo::Reason ConvertNodeDeathReasonToExport(
rpc::NodeDeathInfo::Reason reason) const {
switch (reason) {
case rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_UNSPECIFIED:
return rpc::ExportNodeData_NodeDeathInfo_Reason::
ExportNodeData_NodeDeathInfo_Reason_UNSPECIFIED;
case rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_EXPECTED_TERMINATION:
return rpc::ExportNodeData_NodeDeathInfo_Reason::
ExportNodeData_NodeDeathInfo_Reason_EXPECTED_TERMINATION;
case rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_UNEXPECTED_TERMINATION:
return rpc::ExportNodeData_NodeDeathInfo_Reason::
ExportNodeData_NodeDeathInfo_Reason_UNEXPECTED_TERMINATION;
case rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_AUTOSCALER_DRAIN_PREEMPTED:
return rpc::ExportNodeData_NodeDeathInfo_Reason::
ExportNodeData_NodeDeathInfo_Reason_AUTOSCALER_DRAIN_PREEMPTED;
case rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_AUTOSCALER_DRAIN_IDLE:
return rpc::ExportNodeData_NodeDeathInfo_Reason::
ExportNodeData_NodeDeathInfo_Reason_AUTOSCALER_DRAIN_IDLE;
default:
// Unknown rpc::GcsNodeInfo::GcsNodeState value
RAY_LOG(FATAL) << "Invalid value for rpc::NodeDeathInfo::Reason "
<< rpc::NodeDeathInfo::Reason_Name(reason);
return rpc::ExportNodeData_NodeDeathInfo_Reason::
ExportNodeData_NodeDeathInfo_Reason_UNSPECIFIED;
}
}

/// Alive nodes.
absl::flat_hash_map<NodeID, std::shared_ptr<rpc::GcsNodeInfo>> alive_nodes_;
/// Draining nodes.
Expand Down
6 changes: 5 additions & 1 deletion src/ray/gcs/gcs_server/gcs_server_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,12 @@ int main(int argc, char *argv[]) {

// Initialize event framework.
if (RayConfig::instance().event_log_reporter_enabled() && !log_dir.empty()) {
// This GCS server process emits GCS standard events, and Node export events
// so the various source types are passed to RayEventInit. The type of an
// event is determined by the schema of its event data.
const std::vector<ray::SourceTypeVariant> source_types = {
ray::rpc::Event_SourceType::Event_SourceType_GCS};
ray::rpc::Event_SourceType::Event_SourceType_GCS,
ray::rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_NODE};
ray::RayEventInit(source_types,
absl::flat_hash_map<std::string, std::string>(),
log_dir,
Expand Down
137 changes: 137 additions & 0 deletions src/ray/gcs/gcs_server/test/gcs_node_manager_export_event_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <chrono>
#include <memory>
#include <thread>

// clang-format off
#include "gtest/gtest.h"
#include "ray/gcs/gcs_server/test/gcs_server_test_util.h"
#include "ray/gcs/test/gcs_test_util.h"
#include "ray/rpc/node_manager/node_manager_client.h"
#include "ray/rpc/node_manager/node_manager_client_pool.h"
#include "mock/ray/pubsub/publisher.h"
#include "ray/util/event.h"
// clang-format on

namespace ray {

std::string GenerateLogDir() {
std::string log_dir_generate = std::string(5, ' ');
FillRandom(&log_dir_generate);
std::string log_dir = "event" + StringToHex(log_dir_generate);
return log_dir;
}

class GcsNodeManagerExportAPITest : public ::testing::Test {
public:
GcsNodeManagerExportAPITest() {
raylet_client_ = std::make_shared<GcsServerMocker::MockRayletClient>();
client_pool_ = std::make_shared<rpc::NodeManagerClientPool>(
[this](const rpc::Address &) { return raylet_client_; });
gcs_publisher_ = std::make_shared<gcs::GcsPublisher>(
std::make_unique<ray::pubsub::MockPublisher>());
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);

RayConfig::instance().initialize(
R"(
{
"enable_export_api_write": true
}
)");
log_dir_ = GenerateLogDir();
const std::vector<ray::SourceTypeVariant> source_types = {
rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_NODE};
RayEventInit_(source_types,
absl::flat_hash_map<std::string, std::string>(),
log_dir_,
"warning",
false);
}

virtual ~GcsNodeManagerExportAPITest() {
io_service_.stop();
EventManager::Instance().ClearReporters();
std::filesystem::remove_all(log_dir_.c_str());
}

protected:
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::shared_ptr<GcsServerMocker::MockRayletClient> raylet_client_;
std::shared_ptr<rpc::NodeManagerClientPool> client_pool_;
std::shared_ptr<gcs::GcsPublisher> gcs_publisher_;
instrumented_io_context io_service_;
std::string log_dir_;
};

TEST_F(GcsNodeManagerExportAPITest, TestExportEventRegisterNode) {
// Test export event is written when a node is added with HandleRegisterNode
gcs::GcsNodeManager node_manager(
gcs_publisher_, gcs_table_storage_, client_pool_, ClusterID::Nil());
auto node = Mocker::GenNodeInfo();

rpc::RegisterNodeRequest register_request;
register_request.mutable_node_info()->CopyFrom(*node);
rpc::RegisterNodeReply register_reply;
auto send_reply_callback =
[](ray::Status status, std::function<void()> f1, std::function<void()> f2) {};

node_manager.HandleRegisterNode(register_request, &register_reply, send_reply_callback);
io_service_.poll();

std::vector<std::string> vc;
Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_NODE.log");
ASSERT_EQ((int)vc.size(), 1);
json event_data = json::parse(vc[0])["event_data"].get<json>();
ASSERT_EQ(event_data["state"], "ALIVE");
}

TEST_F(GcsNodeManagerExportAPITest, TestExportEventUnregisterNode) {
// Test export event is written when a node is removed with HandleUnregisterNode
gcs::GcsNodeManager node_manager(
gcs_publisher_, gcs_table_storage_, client_pool_, ClusterID::Nil());
auto node = Mocker::GenNodeInfo();
auto node_id = NodeID::FromBinary(node->node_id());
node_manager.AddNode(node);

rpc::UnregisterNodeRequest unregister_request;
unregister_request.set_node_id(node_id.Binary());
unregister_request.mutable_node_death_info()->set_reason(
rpc::NodeDeathInfo::UNEXPECTED_TERMINATION);
unregister_request.mutable_node_death_info()->set_reason_message("mock reason message");
rpc::UnregisterNodeReply unregister_reply;
auto send_reply_callback =
[](ray::Status status, std::function<void()> f1, std::function<void()> f2) {};

node_manager.HandleUnregisterNode(
unregister_request, &unregister_reply, send_reply_callback);
io_service_.poll();

std::vector<std::string> vc;
Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_NODE.log");
ASSERT_EQ((int)vc.size(), 1);
json event_data = json::parse(vc[0])["event_data"].get<json>();
ASSERT_EQ(event_data["state"], "DEAD");
// Verify death cause for last node DEAD event
ASSERT_EQ(event_data["death_info"]["reason"], "UNEXPECTED_TERMINATION");
ASSERT_EQ(event_data["death_info"]["reason_message"], "mock reason message");
}

} // namespace ray

int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
12 changes: 12 additions & 0 deletions src/ray/gcs/test/gcs_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#pragma once

#include <filesystem>
#include <fstream>
#include <memory>
#include <utility>

Expand Down Expand Up @@ -426,6 +428,16 @@ struct Mocker {
}
return constraint;
}
// Read all lines of a file into vector vc
static void ReadContentFromFile(std::vector<std::string> &vc, std::string log_file) {
std::string line;
std::ifstream read_file;
read_file.open(log_file, std::ios::binary);
while (std::getline(read_file, line)) {
vc.push_back(line);
}
read_file.close();
}
};

} // namespace ray
10 changes: 9 additions & 1 deletion src/ray/util/event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,16 @@ std::string LogEventReporter::ExportEventToString(const rpc::ExportEvent &export
std::string event_data_as_string;
google::protobuf::util::JsonPrintOptions options;
options.preserve_proto_field_names = true;
// Required so enum with value 0 is not omitted
options.always_print_primitive_fields = true;
if (export_event.has_task_event_data()) {
RAY_CHECK(google::protobuf::util::MessageToJsonString(
export_event.task_event_data(), &event_data_as_string, options)
.ok());
} else if (export_event.has_node_event_data()) {
RAY_CHECK(google::protobuf::util::MessageToJsonString(
export_event.node_event_data(), &event_data_as_string, options)
.ok());
} else {
RAY_LOG(FATAL)
<< "event_data missing from export event with id " << export_event.event_id()
Expand Down Expand Up @@ -180,7 +186,9 @@ EventManager &EventManager::Instance() {
return instance_;
}

bool EventManager::IsEmpty() { return reporter_map_.empty(); }
bool EventManager::IsEmpty() {
return reporter_map_.empty() && export_log_reporter_map_.empty();
}

void EventManager::Publish(const rpc::Event &event, const json &custom_fields) {
for (const auto &element : reporter_map_) {
Expand Down

0 comments on commit 4b64abb

Please sign in to comment.