Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[observability][export-api] Write node events" #47405

Merged
merged 1 commit into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 1 addition & 34 deletions src/ray/gcs/gcs_server/gcs_node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,36 +39,6 @@ GcsNodeManager::GcsNodeManager(
raylet_client_pool_(std::move(raylet_client_pool)),
cluster_id_(cluster_id) {}

void GcsNodeManager::WriteNodeExportEvent(rpc::GcsNodeInfo node_info) const {
/// Write node_info as a export node event if
/// enable_export_api_write() is enabled.
if (!RayConfig::instance().enable_export_api_write()) {
return;
}
std::shared_ptr<rpc::ExportNodeData> export_node_data_ptr =
std::make_shared<rpc::ExportNodeData>();
export_node_data_ptr->set_node_id(node_info.node_id());
export_node_data_ptr->set_node_manager_address(node_info.node_manager_address());
export_node_data_ptr->mutable_resources_total()->insert(
node_info.resources_total().begin(), node_info.resources_total().end());
export_node_data_ptr->set_node_name(node_info.node_name());
export_node_data_ptr->set_start_time_ms(node_info.start_time_ms());
export_node_data_ptr->set_end_time_ms(node_info.end_time_ms());
export_node_data_ptr->set_is_head_node(node_info.is_head_node());
export_node_data_ptr->mutable_labels()->insert(node_info.labels().begin(),
node_info.labels().end());
export_node_data_ptr->set_state(ConvertGCSNodeStateToExport(node_info.state()));
if (!node_info.death_info().reason_message().empty() ||
node_info.death_info().reason() !=
rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_UNSPECIFIED) {
export_node_data_ptr->mutable_death_info()->set_reason_message(
node_info.death_info().reason_message());
export_node_data_ptr->mutable_death_info()->set_reason(
ConvertNodeDeathReasonToExport(node_info.death_info().reason()));
}
RayExportEvent(export_node_data_ptr).SendEvent();
}

// Note: ServerCall will populate the cluster_id.
void GcsNodeManager::HandleGetClusterId(rpc::GetClusterIdRequest request,
rpc::GetClusterIdReply *reply,
Expand All @@ -94,7 +64,6 @@ void GcsNodeManager::HandleRegisterNode(rpc::RegisterNodeRequest request,
RAY_CHECK_OK(gcs_publisher_->PublishNodeInfo(node_id, request.node_info(), nullptr));
AddNode(std::make_shared<rpc::GcsNodeInfo>(request.node_info()));
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
WriteNodeExportEvent(request.node_info());
};
if (request.node_info().is_head_node()) {
// mark all old head nodes as dead if exists:
Expand Down Expand Up @@ -163,7 +132,6 @@ void GcsNodeManager::HandleUnregisterNode(rpc::UnregisterNodeRequest request,

auto on_put_done = [=](const Status &status) {
RAY_CHECK_OK(gcs_publisher_->PublishNodeInfo(node_id, *node_info_delta, nullptr));
WriteNodeExportEvent(*node);
};
RAY_CHECK_OK(gcs_table_storage_->NodeTable().Put(node_id, *node, on_put_done));
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
Expand Down Expand Up @@ -435,13 +403,12 @@ void GcsNodeManager::OnNodeFailure(const NodeID &node_id,
node_info_delta->set_end_time_ms(node->end_time_ms());
node_info_delta->mutable_death_info()->CopyFrom(node->death_info());

auto on_done = [this, node_id, node_table_updated_callback, node_info_delta, node](
auto on_done = [this, node_id, node_table_updated_callback, node_info_delta](
const Status &status) {
if (node_table_updated_callback != nullptr) {
node_table_updated_callback(Status::OK());
}
RAY_CHECK_OK(gcs_publisher_->PublishNodeInfo(node_id, *node_info_delta, nullptr));
WriteNodeExportEvent(*node);
};
RAY_CHECK_OK(gcs_table_storage_->NodeTable().Put(node_id, *node, on_done));
} else if (node_table_updated_callback != nullptr) {
Expand Down
45 changes: 0 additions & 45 deletions src/ray/gcs/gcs_server/gcs_node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
#include "ray/rpc/gcs_server/gcs_rpc_server.h"
#include "ray/rpc/node_manager/node_manager_client.h"
#include "ray/rpc/node_manager/node_manager_client_pool.h"
#include "ray/util/event.h"
#include "src/ray/protobuf/gcs.pb.h"

namespace ray {
Expand Down Expand Up @@ -185,50 +184,6 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
/// \return The inferred death info of the node.
rpc::NodeDeathInfo InferDeathInfo(const NodeID &node_id);

void WriteNodeExportEvent(rpc::GcsNodeInfo node_info) const;

rpc::ExportNodeData::GcsNodeState ConvertGCSNodeStateToExport(
rpc::GcsNodeInfo::GcsNodeState node_state) const {
switch (node_state) {
case rpc::GcsNodeInfo_GcsNodeState::GcsNodeInfo_GcsNodeState_ALIVE:
return rpc::ExportNodeData_GcsNodeState::ExportNodeData_GcsNodeState_ALIVE;
case rpc::GcsNodeInfo_GcsNodeState::GcsNodeInfo_GcsNodeState_DEAD:
return rpc::ExportNodeData_GcsNodeState::ExportNodeData_GcsNodeState_DEAD;
default:
// Unknown rpc::GcsNodeInfo::GcsNodeState value
RAY_LOG(FATAL) << "Invalid value for rpc::GcsNodeInfo::GcsNodeState "
<< rpc::GcsNodeInfo::GcsNodeState_Name(node_state);
return rpc::ExportNodeData_GcsNodeState::ExportNodeData_GcsNodeState_DEAD;
}
}

rpc::ExportNodeData::NodeDeathInfo::Reason ConvertNodeDeathReasonToExport(
rpc::NodeDeathInfo::Reason reason) const {
switch (reason) {
case rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_UNSPECIFIED:
return rpc::ExportNodeData_NodeDeathInfo_Reason::
ExportNodeData_NodeDeathInfo_Reason_UNSPECIFIED;
case rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_EXPECTED_TERMINATION:
return rpc::ExportNodeData_NodeDeathInfo_Reason::
ExportNodeData_NodeDeathInfo_Reason_EXPECTED_TERMINATION;
case rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_UNEXPECTED_TERMINATION:
return rpc::ExportNodeData_NodeDeathInfo_Reason::
ExportNodeData_NodeDeathInfo_Reason_UNEXPECTED_TERMINATION;
case rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_AUTOSCALER_DRAIN_PREEMPTED:
return rpc::ExportNodeData_NodeDeathInfo_Reason::
ExportNodeData_NodeDeathInfo_Reason_AUTOSCALER_DRAIN_PREEMPTED;
case rpc::NodeDeathInfo_Reason::NodeDeathInfo_Reason_AUTOSCALER_DRAIN_IDLE:
return rpc::ExportNodeData_NodeDeathInfo_Reason::
ExportNodeData_NodeDeathInfo_Reason_AUTOSCALER_DRAIN_IDLE;
default:
// Unknown rpc::GcsNodeInfo::GcsNodeState value
RAY_LOG(FATAL) << "Invalid value for rpc::NodeDeathInfo::Reason "
<< rpc::NodeDeathInfo::Reason_Name(reason);
return rpc::ExportNodeData_NodeDeathInfo_Reason::
ExportNodeData_NodeDeathInfo_Reason_UNSPECIFIED;
}
}

/// Alive nodes.
absl::flat_hash_map<NodeID, std::shared_ptr<rpc::GcsNodeInfo>> alive_nodes_;
/// Draining nodes.
Expand Down
6 changes: 1 addition & 5 deletions src/ray/gcs/gcs_server/gcs_server_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,8 @@ int main(int argc, char *argv[]) {

// Initialize event framework.
if (RayConfig::instance().event_log_reporter_enabled() && !log_dir.empty()) {
// This GCS server process emits GCS standard events, and Node export events
// so the various source types are passed to RayEventInit. The type of an
// event is determined by the schema of its event data.
const std::vector<ray::SourceTypeVariant> source_types = {
ray::rpc::Event_SourceType::Event_SourceType_GCS,
ray::rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_NODE};
ray::rpc::Event_SourceType::Event_SourceType_GCS};
ray::RayEventInit(source_types,
absl::flat_hash_map<std::string, std::string>(),
log_dir,
Expand Down
105 changes: 0 additions & 105 deletions src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <chrono>
#include <memory>
#include <thread>

// clang-format off
#include "gtest/gtest.h"
Expand All @@ -23,11 +21,9 @@
#include "ray/rpc/node_manager/node_manager_client.h"
#include "ray/rpc/node_manager/node_manager_client_pool.h"
#include "mock/ray/pubsub/publisher.h"
#include "ray/util/event.h"
// clang-format on

namespace ray {

class GcsNodeManagerTest : public ::testing::Test {
public:
GcsNodeManagerTest() {
Expand All @@ -45,54 +41,6 @@ class GcsNodeManagerTest : public ::testing::Test {
std::shared_ptr<gcs::GcsPublisher> gcs_publisher_;
};

std::string GenerateLogDir() {
std::string log_dir_generate = std::string(5, ' ');
FillRandom(&log_dir_generate);
std::string log_dir = "event" + StringToHex(log_dir_generate);
return log_dir;
}

class GcsNodeManagerExportAPITest : public ::testing::Test {
public:
GcsNodeManagerExportAPITest() {
raylet_client_ = std::make_shared<GcsServerMocker::MockRayletClient>();
client_pool_ = std::make_shared<rpc::NodeManagerClientPool>(
[this](const rpc::Address &) { return raylet_client_; });
gcs_publisher_ = std::make_shared<gcs::GcsPublisher>(
std::make_unique<ray::pubsub::MockPublisher>());
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);

RayConfig::instance().initialize(
R"(
{
"enable_export_api_write": true
}
)");
log_dir_ = GenerateLogDir();
const std::vector<ray::SourceTypeVariant> source_types = {
rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_NODE};
RayEventInit_(source_types,
absl::flat_hash_map<std::string, std::string>(),
log_dir_,
"warning",
false);
}

virtual ~GcsNodeManagerExportAPITest() {
io_service_.stop();
EventManager::Instance().ClearReporters();
std::filesystem::remove_all(log_dir_.c_str());
}

protected:
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::shared_ptr<GcsServerMocker::MockRayletClient> raylet_client_;
std::shared_ptr<rpc::NodeManagerClientPool> client_pool_;
std::shared_ptr<gcs::GcsPublisher> gcs_publisher_;
instrumented_io_context io_service_;
std::string log_dir_;
};

TEST_F(GcsNodeManagerTest, TestManagement) {
gcs::GcsNodeManager node_manager(
gcs_publisher_, gcs_table_storage_, client_pool_, ClusterID::Nil());
Expand Down Expand Up @@ -148,59 +96,6 @@ TEST_F(GcsNodeManagerTest, TestListener) {
}
}

TEST_F(GcsNodeManagerExportAPITest, TestExportEventRegisterNode) {
// Test export event is written when a node is added with HandleRegisterNode
gcs::GcsNodeManager node_manager(
gcs_publisher_, gcs_table_storage_, client_pool_, ClusterID::Nil());
auto node = Mocker::GenNodeInfo();

rpc::RegisterNodeRequest register_request;
register_request.mutable_node_info()->CopyFrom(*node);
rpc::RegisterNodeReply register_reply;
auto send_reply_callback =
[](ray::Status status, std::function<void()> f1, std::function<void()> f2) {};

node_manager.HandleRegisterNode(register_request, &register_reply, send_reply_callback);
io_service_.poll();

std::vector<std::string> vc;
Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_NODE.log");
ASSERT_EQ((int)vc.size(), 1);
json event_data = json::parse(vc[0])["event_data"].get<json>();
ASSERT_EQ(event_data["state"], "ALIVE");
}

TEST_F(GcsNodeManagerExportAPITest, TestExportEventUnregisterNode) {
// Test export event is written when a node is removed with HandleUnregisterNode
gcs::GcsNodeManager node_manager(
gcs_publisher_, gcs_table_storage_, client_pool_, ClusterID::Nil());
auto node = Mocker::GenNodeInfo();
auto node_id = NodeID::FromBinary(node->node_id());
node_manager.AddNode(node);

rpc::UnregisterNodeRequest unregister_request;
unregister_request.set_node_id(node_id.Binary());
unregister_request.mutable_node_death_info()->set_reason(
rpc::NodeDeathInfo::UNEXPECTED_TERMINATION);
unregister_request.mutable_node_death_info()->set_reason_message("mock reason message");
rpc::UnregisterNodeReply unregister_reply;
auto send_reply_callback =
[](ray::Status status, std::function<void()> f1, std::function<void()> f2) {};

node_manager.HandleUnregisterNode(
unregister_request, &unregister_reply, send_reply_callback);
io_service_.poll();

std::vector<std::string> vc;
Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_NODE.log");
ASSERT_EQ((int)vc.size(), 1);
json event_data = json::parse(vc[0])["event_data"].get<json>();
ASSERT_EQ(event_data["state"], "DEAD");
// Verify death cause for last node DEAD event
ASSERT_EQ(event_data["death_info"]["reason"], "UNEXPECTED_TERMINATION");
ASSERT_EQ(event_data["death_info"]["reason_message"], "mock reason message");
}

} // namespace ray

int main(int argc, char **argv) {
Expand Down
12 changes: 0 additions & 12 deletions src/ray/gcs/test/gcs_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

#pragma once

#include <filesystem>
#include <fstream>
#include <memory>
#include <utility>

Expand Down Expand Up @@ -428,16 +426,6 @@ struct Mocker {
}
return constraint;
}
// Read all lines of a file into vector vc
static void ReadContentFromFile(std::vector<std::string> &vc, std::string log_file) {
std::string line;
std::ifstream read_file;
read_file.open(log_file, std::ios::binary);
while (std::getline(read_file, line)) {
vc.push_back(line);
}
read_file.close();
}
};

} // namespace ray
10 changes: 1 addition & 9 deletions src/ray/util/event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,10 @@ std::string LogEventReporter::ExportEventToString(const rpc::ExportEvent &export
std::string event_data_as_string;
google::protobuf::util::JsonPrintOptions options;
options.preserve_proto_field_names = true;
// Required so enum with value 0 is not omitted
options.always_print_primitive_fields = true;
if (export_event.has_task_event_data()) {
RAY_CHECK(google::protobuf::util::MessageToJsonString(
export_event.task_event_data(), &event_data_as_string, options)
.ok());
} else if (export_event.has_node_event_data()) {
RAY_CHECK(google::protobuf::util::MessageToJsonString(
export_event.node_event_data(), &event_data_as_string, options)
.ok());
} else {
RAY_LOG(FATAL)
<< "event_data missing from export event with id " << export_event.event_id()
Expand Down Expand Up @@ -186,9 +180,7 @@ EventManager &EventManager::Instance() {
return instance_;
}

bool EventManager::IsEmpty() {
return reporter_map_.empty() && export_log_reporter_map_.empty();
}
bool EventManager::IsEmpty() { return reporter_map_.empty(); }

void EventManager::Publish(const rpc::Event &event, const json &custom_fields) {
for (const auto &element : reporter_map_) {
Expand Down
Loading