Skip to content

Commit

Permalink
[1/n] Stabilize GCS/Autoscaler interface: Introduce monitor server (#…
Browse files Browse the repository at this point in the history
…31827)

This is the first PR towards stabilizing the GCS autoscaler interface by introducing a new grpc service definition which we will provide backwards compatibility guarantees.

This PR mostly just introduces scaffolding and a trivial GetRayVersion endpoint.

By the end of this series of PRs, monitor.py will only communicate with the rest of the ray cluster via this service definition.
  • Loading branch information
Alex Wu authored Jan 26, 2023
1 parent a32b9b1 commit e753b03
Show file tree
Hide file tree
Showing 11 changed files with 278 additions and 0 deletions.
36 changes: 36 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,23 @@ cc_library(
],
)

# monitor/autoscaler service
cc_grpc_library(
name = "monitor_cc_grpc",
srcs = ["//src/ray/protobuf:monitor_proto"],
grpc_only = True,
deps = ["//src/ray/protobuf:monitor_cc_proto"],
)

cc_library(
name = "monitor_rpc",
copts = COPTS,
visibility = ["//visibility:public"],
deps = [
":monitor_cc_grpc",
],
)

# === End of rpc definitions ===

# === Begin of plasma definitions ===
Expand Down Expand Up @@ -541,6 +558,7 @@ cc_library(
":gcs_service_cc_grpc",
":gcs_service_rpc",
":gcs_table_storage_lib",
":monitor_rpc",
":node_manager_rpc",
":observable_store_client",
":pubsub_lib",
Expand Down Expand Up @@ -2020,6 +2038,23 @@ cc_test(
],
)

cc_test(
name = "gcs_monitor_server_test",
size = "small",
srcs = [
"src/ray/gcs/gcs_server/test/gcs_monitor_server_test.cc",
],
copts = COPTS,
tags = ["team:serverless"],
deps = [
":gcs_server_lib",
":gcs_server_test_util",
":gcs_test_util_lib",
":ray_mock",
"@com_google_googletest//:gtest_main",
],
)

cc_library(
name = "gcs_table_storage_lib",
srcs = glob(
Expand Down Expand Up @@ -2828,6 +2863,7 @@ filegroup(
"//src/ray/protobuf:gcs_py_proto",
"//src/ray/protobuf:gcs_service_py_proto",
"//src/ray/protobuf:job_agent_py_proto",
"//src/ray/protobuf:monitor_py_proto",
"//src/ray/protobuf:node_manager_py_proto",
"//src/ray/protobuf:ray_client_py_proto",
"//src/ray/protobuf:reporter_py_proto",
Expand Down
8 changes: 8 additions & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,14 @@ py_test_module_list(
deps = ["//:ray_lib", ":conftest"],
)

py_test_module_list(
files = [
"test_monitor_service.py",
],
size = "medium",
tags = ["exclusive", "medium_size_python_tests_k_to_z", "team:serverless"],
deps = ["//:ray_lib", ":conftest"],
)

py_test_module_list(
files = [
Expand Down
18 changes: 18 additions & 0 deletions python/ray/tests/test_monitor_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import pytest

import ray
import grpc
from ray.core.generated import monitor_pb2, monitor_pb2_grpc


@pytest.fixture
def monitor_stub(ray_start_regular_shared):
channel = grpc.insecure_channel(ray_start_regular_shared["gcs_address"])

return monitor_pb2_grpc.MonitorGcsServiceStub(channel)


def test_ray_version(monitor_stub):
request = monitor_pb2.GetRayVersionRequest()
response = monitor_stub.GetRayVersion(request)
assert response.version == ray.__version__
32 changes: 32 additions & 0 deletions src/ray/gcs/gcs_server/gcs_monitor_server.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "ray/gcs/gcs_server/gcs_monitor_server.h"

#include "ray/common/constants.h"

namespace ray {
namespace gcs {

GcsMonitorServer::GcsMonitorServer() {}

void GcsMonitorServer::HandleGetRayVersion(rpc::GetRayVersionRequest request,
rpc::GetRayVersionReply *reply,
rpc::SendReplyCallback send_reply_callback) {
reply->set_version(kRayVersion);
send_reply_callback(Status::OK(), nullptr, nullptr);
}

} // namespace gcs
} // namespace ray
33 changes: 33 additions & 0 deletions src/ray/gcs/gcs_server/gcs_monitor_server.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2023 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "ray/rpc/gcs_server/gcs_rpc_server.h"

namespace ray {
namespace gcs {

/// GcsMonitorServer is a shim responsible for providing a compatible interface between
/// GCS and `monitor.py`
class GcsMonitorServer : public rpc::MonitorServiceHandler {
public:
explicit GcsMonitorServer();

void HandleGetRayVersion(rpc::GetRayVersionRequest request,
rpc::GetRayVersionReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
};
} // namespace gcs
} // namespace ray
10 changes: 10 additions & 0 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) {
// Init GCS task manager.
InitGcsTaskManager();

// Init Monitor service.
InitMonitorServer();

// Install event listeners.
InstallEventListeners();

Expand Down Expand Up @@ -586,6 +589,13 @@ void GcsServer::InitGcsTaskManager() {
rpc_server_.RegisterService(*task_info_service_);
}

void GcsServer::InitMonitorServer() {
monitor_server_ = std::make_unique<GcsMonitorServer>();
monitor_grpc_service_.reset(
new rpc::MonitorGrpcService(main_service_, *monitor_server_));
rpc_server_.RegisterService(*monitor_grpc_service_);
}

void GcsServer::InstallEventListeners() {
// Install node event listeners.
gcs_node_manager_->AddNodeAddedListener([this](std::shared_ptr<rpc::GcsNodeInfo> node) {
Expand Down
8 changes: 8 additions & 0 deletions src/ray/gcs/gcs_server/gcs_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "ray/gcs/gcs_server/gcs_health_check_manager.h"
#include "ray/gcs/gcs_server/gcs_init_data.h"
#include "ray/gcs/gcs_server/gcs_kv_manager.h"
#include "ray/gcs/gcs_server/gcs_monitor_server.h"
#include "ray/gcs/gcs_server/gcs_redis_failure_detector.h"
#include "ray/gcs/gcs_server/gcs_table_storage.h"
#include "ray/gcs/gcs_server/gcs_task_manager.h"
Expand Down Expand Up @@ -151,6 +152,9 @@ class GcsServer {
/// Install event listeners.
void InstallEventListeners();

/// Initialize monitor service.
void InitMonitorServer();

private:
/// Gets the type of KV storage to use from config.
std::string StorageType() const;
Expand Down Expand Up @@ -215,6 +219,10 @@ class GcsServer {
std::unique_ptr<GcsFunctionManager> function_manager_;
/// Node resource info handler and service.
std::unique_ptr<rpc::NodeResourceInfoGrpcService> node_resource_info_service_;
/// Monitor server supports monitor.py
std::unique_ptr<GcsMonitorServer> monitor_server_;
/// Monitor service for monitor server
std::unique_ptr<rpc::MonitorGrpcService> monitor_grpc_service_;

/// Synchronization service for ray.
/// TODO(iycheng): Deprecate this gcs_ray_syncer_ one once we roll out
Expand Down
46 changes: 46 additions & 0 deletions src/ray/gcs/gcs_server/test/gcs_monitor_server_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@

// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <memory>

// clang-format off
#include "gtest/gtest.h"
#include "ray/gcs/gcs_server/test/gcs_server_test_util.h"
#include "ray/gcs/test/gcs_test_util.h"
#include "ray/gcs/gcs_server/gcs_monitor_server.h"
#include "mock/ray/pubsub/publisher.h"
// clang-format on

namespace ray {
class GcsMonitorServerTest : public ::testing::Test {
public:
GcsMonitorServerTest() : monitor_server_() {}

protected:
gcs::GcsMonitorServer monitor_server_;
};

TEST_F(GcsMonitorServerTest, TestRayVersion) {
rpc::GetRayVersionRequest request;
rpc::GetRayVersionReply reply;
auto send_reply_callback =
[](ray::Status status, std::function<void()> f1, std::function<void()> f2) {};

monitor_server_.HandleGetRayVersion(request, &reply, send_reply_callback);

ASSERT_EQ(reply.version(), kRayVersion);
}

} // namespace ray
15 changes: 15 additions & 0 deletions src/ray/protobuf/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,21 @@ load("@rules_proto//proto:defs.bzl", "proto_library")
load("@rules_cc//cc:defs.bzl", "cc_binary", "cc_library", "cc_proto_library", "cc_test")
load("@rules_proto_grpc//python:defs.bzl", "python_grpc_compile")

proto_library(
name = "monitor_proto",
srcs = ["monitor.proto"],
)

cc_proto_library(
name = "monitor_cc_proto",
deps = [":monitor_proto"],
)

python_grpc_compile(
name = "monitor_py_proto",
deps = [":monitor_proto"],
)

proto_library(
name = "common_proto",
srcs = ["common.proto"],
Expand Down
30 changes: 30 additions & 0 deletions src/ray/protobuf/monitor.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";
option cc_enable_arenas = true;
package ray.rpc;

message GetRayVersionRequest {}

message GetRayVersionReply {
string version = 1;
}

// This service provides a stable interface for a monitor/autoscaler process to interact
// with Ray.
service MonitorGcsService {
// Get the ray version of the service.
rpc GetRayVersion(GetRayVersionRequest) returns (GetRayVersionReply);
}
42 changes: 42 additions & 0 deletions src/ray/rpc/gcs_server/gcs_rpc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "ray/rpc/grpc_server.h"
#include "ray/rpc/server_call.h"
#include "src/ray/protobuf/gcs_service.grpc.pb.h"
#include "src/ray/protobuf/monitor.grpc.pb.h"

namespace ray {
namespace rpc {
Expand All @@ -31,6 +32,11 @@ namespace rpc {
#define ACTOR_INFO_SERVICE_RPC_HANDLER(HANDLER, MAX_ACTIVE_RPCS) \
RPC_SERVICE_HANDLER(ActorInfoGcsService, HANDLER, MAX_ACTIVE_RPCS)

#define MONITOR_SERVICE_RPC_HANDLER(HANDLER) \
RPC_SERVICE_HANDLER(MonitorGcsService, \
HANDLER, \
RayConfig::instance().gcs_max_active_rpcs_per_handler())

#define NODE_INFO_SERVICE_RPC_HANDLER(HANDLER) \
RPC_SERVICE_HANDLER(NodeInfoGcsService, \
HANDLER, \
Expand Down Expand Up @@ -209,6 +215,41 @@ class ActorInfoGrpcService : public GrpcService {
ActorInfoGcsServiceHandler &service_handler_;
};

class MonitorGcsServiceHandler {
public:
virtual ~MonitorGcsServiceHandler() = default;

virtual void HandleGetRayVersion(GetRayVersionRequest request,
GetRayVersionReply *reply,
SendReplyCallback send_reply_callback) = 0;
};

/// The `GrpcService` for `MonitorServer`.
class MonitorGrpcService : public GrpcService {
public:
/// Constructor.
///
/// \param[in] handler The service handler that actually handle the requests.
explicit MonitorGrpcService(instrumented_io_context &io_service,
MonitorGcsServiceHandler &handler)
: GrpcService(io_service), service_handler_(handler){};

protected:
grpc::Service &GetGrpcService() override { return service_; }

void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
MONITOR_SERVICE_RPC_HANDLER(GetRayVersion);
}

private:
/// The grpc async service object.
MonitorGcsService::AsyncService service_;
/// The service handler that actually handle the requests.
MonitorGcsServiceHandler &service_handler_;
};

class NodeInfoGcsServiceHandler {
public:
virtual ~NodeInfoGcsServiceHandler() = default;
Expand Down Expand Up @@ -581,6 +622,7 @@ class InternalPubSubGrpcService : public GrpcService {

using JobInfoHandler = JobInfoGcsServiceHandler;
using ActorInfoHandler = ActorInfoGcsServiceHandler;
using MonitorServiceHandler = MonitorGcsServiceHandler;
using NodeInfoHandler = NodeInfoGcsServiceHandler;
using NodeResourceInfoHandler = NodeResourceInfoGcsServiceHandler;
using WorkerInfoHandler = WorkerInfoGcsServiceHandler;
Expand Down

0 comments on commit e753b03

Please sign in to comment.