Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/ray/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ constexpr int kRayletStoreErrorExitCode = 100;
constexpr char kObjectTablePrefix[] = "ObjectTable";

constexpr char kClusterIdKey[] = "ray_cluster_id";
constexpr char kAuthTokenKey[] = "authorization";
constexpr char kBearerPrefix[] = "Bearer ";

constexpr char kWorkerDynamicOptionPlaceholder[] =
"RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER";
Expand Down
4 changes: 4 additions & 0 deletions src/ray/common/grpc_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ inline grpc::Status RayStatusToGrpcStatus(const Status &ray_status) {
if (ray_status.ok()) {
return grpc::Status::OK;
}
// Map Unauthenticated to gRPC's UNAUTHENTICATED status code
if (ray_status.IsUnauthenticated()) {
return grpc::Status(grpc::StatusCode::UNAUTHENTICATED, ray_status.message());
}
// Unlike `UNKNOWN`, `ABORTED` is never generated by the library, so using it means
// more robust.
return grpc::Status(
Expand Down
6 changes: 6 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ RAY_CONFIG(bool, emit_main_service_metrics, true)
/// Whether to enable cluster authentication.
RAY_CONFIG(bool, enable_cluster_auth, true)

/// Whether to enable token-based authentication for RPC calls.
/// will be converted to AuthenticationMode enum defined in
/// rpc/authentication/authentication_mode.h
/// use GetAuthenticationMode() to get the authentication mode enum value.
RAY_CONFIG(std::string, auth_mode, "disabled")

/// The interval of periodic event loop stats print.
/// -1 means the feature is disabled. In this case, stats are available
/// in the associated process's log file.
Expand Down
2 changes: 1 addition & 1 deletion src/ray/common/status.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ const absl::flat_hash_map<StatusCode, std::string_view> kCodeToStr = {
{StatusCode::RpcError, "RpcError"},
{StatusCode::OutOfResource, "OutOfResource"},
{StatusCode::ObjectRefEndOfStream, "ObjectRefEndOfStream"},
{StatusCode::AuthError, "AuthError"},
{StatusCode::Unauthenticated, "Unauthenticated"},
{StatusCode::InvalidArgument, "InvalidArgument"},
{StatusCode::ChannelError, "ChannelError"},
{StatusCode::ChannelTimeoutError, "ChannelTimeoutError"},
Expand Down
8 changes: 4 additions & 4 deletions src/ray/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ enum class StatusCode : char {
RpcError = 30,
OutOfResource = 31,
ObjectRefEndOfStream = 32,
AuthError = 33,
Unauthenticated = 33,
// Indicates the input value is not valid.
InvalidArgument = 34,
// Indicates that a channel (a mutable plasma object) is closed and cannot be
Expand Down Expand Up @@ -415,8 +415,8 @@ class RAY_EXPORT Status {
return Status(StatusCode::OutOfResource, msg);
}

static Status AuthError(const std::string &msg) {
return Status(StatusCode::AuthError, msg);
static Status Unauthenticated(const std::string &msg) {
return Status(StatusCode::Unauthenticated, msg);
}

static Status ChannelError(const std::string &msg) {
Expand Down Expand Up @@ -475,7 +475,7 @@ class RAY_EXPORT Status {

bool IsOutOfResource() const { return code() == StatusCode::OutOfResource; }

bool IsAuthError() const { return code() == StatusCode::AuthError; }
bool IsUnauthenticated() const { return code() == StatusCode::Unauthenticated; }

bool IsChannelError() const { return code() == StatusCode::ChannelError; }

Expand Down
1 change: 1 addition & 0 deletions src/ray/core_worker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ ray_cc_library(
"//src/ray/protobuf:core_worker_cc_proto",
"//src/ray/rpc:grpc_server",
"//src/ray/rpc:rpc_callback_types",
"//src/ray/rpc/authentication:authentication_token",
],
)

Expand Down
76 changes: 45 additions & 31 deletions src/ray/core_worker/grpc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "ray/core_worker/grpc_service.h"

#include <memory>
#include <string>
#include <vector>

namespace ray {
Expand All @@ -23,91 +24,104 @@ namespace rpc {
void CoreWorkerGrpcService::InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
const ClusterID &cluster_id) {
const ClusterID &cluster_id,
const std::optional<AuthenticationToken> &auth_token) {
/// TODO(vitsai): Remove this when auth is implemented for node manager.
/// Disable gRPC server metrics since it incurs too high cardinality.
Comment on lines 29 to 30
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should implement this later in the project

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack, just to clarify, we want to enable metrics once auth is implemented right?

there will probably be a couple of considerations before we can go ahead and make this change. will keep it in backlog for now

RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(
CoreWorkerService, PushTask, max_active_rpcs_per_handler_, AuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
PushTask,
max_active_rpcs_per_handler_,
ClusterIdAuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
ActorCallArgWaitComplete,
max_active_rpcs_per_handler_,
AuthType::NO_AUTH);
ClusterIdAuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
RayletNotifyGCSRestart,
max_active_rpcs_per_handler_,
AuthType::NO_AUTH);
ClusterIdAuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
GetObjectStatus,
max_active_rpcs_per_handler_,
AuthType::NO_AUTH);
ClusterIdAuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
WaitForActorRefDeleted,
max_active_rpcs_per_handler_,
AuthType::NO_AUTH);
ClusterIdAuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
PubsubLongPolling,
max_active_rpcs_per_handler_,
AuthType::NO_AUTH);
ClusterIdAuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
PubsubCommandBatch,
max_active_rpcs_per_handler_,
AuthType::NO_AUTH);
ClusterIdAuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
UpdateObjectLocationBatch,
max_active_rpcs_per_handler_,
AuthType::NO_AUTH);
ClusterIdAuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
GetObjectLocationsOwner,
max_active_rpcs_per_handler_,
AuthType::NO_AUTH);
ClusterIdAuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
ReportGeneratorItemReturns,
max_active_rpcs_per_handler_,
AuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(
CoreWorkerService, KillActor, max_active_rpcs_per_handler_, AuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(
CoreWorkerService, CancelTask, max_active_rpcs_per_handler_, AuthType::NO_AUTH);
ClusterIdAuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
KillActor,
max_active_rpcs_per_handler_,
ClusterIdAuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
CancelTask,
max_active_rpcs_per_handler_,
ClusterIdAuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
CancelRemoteTask,
max_active_rpcs_per_handler_,
AuthType::NO_AUTH);
ClusterIdAuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
RegisterMutableObjectReader,
max_active_rpcs_per_handler_,
AuthType::NO_AUTH);
ClusterIdAuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
GetCoreWorkerStats,
max_active_rpcs_per_handler_,
AuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(
CoreWorkerService, LocalGC, max_active_rpcs_per_handler_, AuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(
CoreWorkerService, DeleteObjects, max_active_rpcs_per_handler_, AuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(
CoreWorkerService, SpillObjects, max_active_rpcs_per_handler_, AuthType::NO_AUTH);
ClusterIdAuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
LocalGC,
max_active_rpcs_per_handler_,
ClusterIdAuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
DeleteObjects,
max_active_rpcs_per_handler_,
ClusterIdAuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
SpillObjects,
max_active_rpcs_per_handler_,
ClusterIdAuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
RestoreSpilledObjects,
max_active_rpcs_per_handler_,
AuthType::NO_AUTH);
ClusterIdAuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
DeleteSpilledObjects,
max_active_rpcs_per_handler_,
AuthType::NO_AUTH);
ClusterIdAuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
PlasmaObjectReady,
max_active_rpcs_per_handler_,
AuthType::NO_AUTH);
ClusterIdAuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(
CoreWorkerService, Exit, max_active_rpcs_per_handler_, AuthType::NO_AUTH);
CoreWorkerService, Exit, max_active_rpcs_per_handler_, ClusterIdAuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
AssignObjectOwner,
max_active_rpcs_per_handler_,
AuthType::NO_AUTH);
ClusterIdAuthType::NO_AUTH);
RPC_SERVICE_HANDLER_CUSTOM_AUTH_SERVER_METRICS_DISABLED(CoreWorkerService,
NumPendingTasks,
max_active_rpcs_per_handler_,
AuthType::NO_AUTH);
ClusterIdAuthType::NO_AUTH);
}

} // namespace rpc
Expand Down
6 changes: 5 additions & 1 deletion src/ray/core_worker/grpc_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@
#pragma once

#include <memory>
#include <optional>
#include <string>
#include <vector>

#include "ray/common/asio/instrumented_io_context.h"
#include "ray/rpc/authentication/authentication_token.h"
#include "ray/rpc/grpc_server.h"
#include "ray/rpc/rpc_callback_types.h"
#include "src/ray/protobuf/core_worker.grpc.pb.h"
Expand Down Expand Up @@ -158,7 +161,8 @@ class CoreWorkerGrpcService : public GrpcService {
void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
const ClusterID &cluster_id) override;
const ClusterID &cluster_id,
const std::optional<AuthenticationToken> &auth_token) override;

private:
CoreWorkerService::AsyncService service_;
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ ray_cc_library(
"//src/ray/protobuf:gcs_service_cc_grpc",
"//src/ray/rpc:grpc_server",
"//src/ray/rpc:rpc_callback_types",
"//src/ray/rpc/authentication:authentication_token",
"@com_github_grpc_grpc//:grpc++",
],
)
Expand Down
Loading