Skip to content

Commit

Permalink
[observability][export-api] Write node events (ray-project#47221)
Browse files Browse the repository at this point in the history
Write node events to file as part of the export API. This logic is only run if RayConfig::instance().enable_export_api_write() is true. Default value is false.
Event write is called whenever a value in the node event data schema is modified. Typically this occurs in the callback after writing NodeTable to the GCS table

Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
  • Loading branch information
nikitavemuri authored and ujjawal-khare committed Oct 15, 2024
1 parent 2b4c3e0 commit 24589a4
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/ray/gcs/gcs_server/gcs_node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ void GcsNodeManager::HandleRegisterNode(rpc::RegisterNodeRequest request,
AddNode(std::make_shared<rpc::GcsNodeInfo>(request.node_info()));
WriteNodeExportEvent(request.node_info());
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
WriteNodeExportEvent(request.node_info());
};
if (request.node_info().is_head_node()) {
// mark all old head nodes as dead if exists:
Expand Down Expand Up @@ -428,6 +429,7 @@ void GcsNodeManager::OnNodeFailure(const NodeID &node_id,
node_table_updated_callback(Status::OK());
}
RAY_CHECK_OK(gcs_publisher_->PublishNodeInfo(node_id, *node_info_delta, nullptr));
WriteNodeExportEvent(*node);
};
RAY_CHECK_OK(gcs_table_storage_->NodeTable().Put(node_id, *node, on_done));
} else if (node_table_updated_callback != nullptr) {
Expand Down
105 changes: 105 additions & 0 deletions src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <chrono>
#include <memory>
#include <thread>

// clang-format off
#include "gtest/gtest.h"
Expand All @@ -21,9 +23,11 @@
#include "ray/rpc/node_manager/node_manager_client.h"
#include "ray/rpc/node_manager/node_manager_client_pool.h"
#include "mock/ray/pubsub/publisher.h"
#include "ray/util/event.h"
// clang-format on

namespace ray {

class GcsNodeManagerTest : public ::testing::Test {
public:
GcsNodeManagerTest() {
Expand All @@ -41,6 +45,54 @@ class GcsNodeManagerTest : public ::testing::Test {
std::shared_ptr<gcs::GcsPublisher> gcs_publisher_;
};

std::string GenerateLogDir() {
std::string log_dir_generate = std::string(5, ' ');
FillRandom(&log_dir_generate);
std::string log_dir = "event" + StringToHex(log_dir_generate);
return log_dir;
}

class GcsNodeManagerExportAPITest : public ::testing::Test {
public:
GcsNodeManagerExportAPITest() {
raylet_client_ = std::make_shared<GcsServerMocker::MockRayletClient>();
client_pool_ = std::make_shared<rpc::NodeManagerClientPool>(
[this](const rpc::Address &) { return raylet_client_; });
gcs_publisher_ = std::make_shared<gcs::GcsPublisher>(
std::make_unique<ray::pubsub::MockPublisher>());
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);

RayConfig::instance().initialize(
R"(
{
"enable_export_api_write": true
}
)");
log_dir_ = GenerateLogDir();
const std::vector<ray::SourceTypeVariant> source_types = {
rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_NODE};
RayEventInit_(source_types,
absl::flat_hash_map<std::string, std::string>(),
log_dir_,
"warning",
false);
}

virtual ~GcsNodeManagerExportAPITest() {
io_service_.stop();
EventManager::Instance().ClearReporters();
std::filesystem::remove_all(log_dir_.c_str());
}

protected:
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::shared_ptr<GcsServerMocker::MockRayletClient> raylet_client_;
std::shared_ptr<rpc::NodeManagerClientPool> client_pool_;
std::shared_ptr<gcs::GcsPublisher> gcs_publisher_;
instrumented_io_context io_service_;
std::string log_dir_;
};

TEST_F(GcsNodeManagerTest, TestManagement) {
gcs::GcsNodeManager node_manager(
gcs_publisher_, gcs_table_storage_, client_pool_, ClusterID::Nil());
Expand Down Expand Up @@ -96,6 +148,59 @@ TEST_F(GcsNodeManagerTest, TestListener) {
}
}

TEST_F(GcsNodeManagerExportAPITest, TestExportEventRegisterNode) {
// Test export event is written when a node is added with HandleRegisterNode
gcs::GcsNodeManager node_manager(
gcs_publisher_, gcs_table_storage_, client_pool_, ClusterID::Nil());
auto node = Mocker::GenNodeInfo();

rpc::RegisterNodeRequest register_request;
register_request.mutable_node_info()->CopyFrom(*node);
rpc::RegisterNodeReply register_reply;
auto send_reply_callback =
[](ray::Status status, std::function<void()> f1, std::function<void()> f2) {};

node_manager.HandleRegisterNode(register_request, &register_reply, send_reply_callback);
io_service_.poll();

std::vector<std::string> vc;
Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_NODE.log");
ASSERT_EQ((int)vc.size(), 1);
json event_data = json::parse(vc[0])["event_data"].get<json>();
ASSERT_EQ(event_data["state"], "ALIVE");
}

TEST_F(GcsNodeManagerExportAPITest, TestExportEventUnregisterNode) {
// Test export event is written when a node is removed with HandleUnregisterNode
gcs::GcsNodeManager node_manager(
gcs_publisher_, gcs_table_storage_, client_pool_, ClusterID::Nil());
auto node = Mocker::GenNodeInfo();
auto node_id = NodeID::FromBinary(node->node_id());
node_manager.AddNode(node);

rpc::UnregisterNodeRequest unregister_request;
unregister_request.set_node_id(node_id.Binary());
unregister_request.mutable_node_death_info()->set_reason(
rpc::NodeDeathInfo::UNEXPECTED_TERMINATION);
unregister_request.mutable_node_death_info()->set_reason_message("mock reason message");
rpc::UnregisterNodeReply unregister_reply;
auto send_reply_callback =
[](ray::Status status, std::function<void()> f1, std::function<void()> f2) {};

node_manager.HandleUnregisterNode(
unregister_request, &unregister_reply, send_reply_callback);
io_service_.poll();

std::vector<std::string> vc;
Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_NODE.log");
ASSERT_EQ((int)vc.size(), 1);
json event_data = json::parse(vc[0])["event_data"].get<json>();
ASSERT_EQ(event_data["state"], "DEAD");
// Verify death cause for last node DEAD event
ASSERT_EQ(event_data["death_info"]["reason"], "UNEXPECTED_TERMINATION");
ASSERT_EQ(event_data["death_info"]["reason_message"], "mock reason message");
}

} // namespace ray

int main(int argc, char **argv) {
Expand Down

0 comments on commit 24589a4

Please sign in to comment.