Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] The New GcsClient binding #46186

Merged
merged 42 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
87113d7
Add extra methods to GcsClient.
rynewang Jun 20, 2024
38d8569
Add Python bindings for sync methods for GcsClient
rynewang Jun 20, 2024
49d7ea1
various fixes
rynewang Jun 21, 2024
ed985a8
lint
rynewang Jun 21, 2024
2205487
fix typo
rynewang Jun 21, 2024
5612a34
fix mock
rynewang Jun 21, 2024
6fb30ac
fixes
rynewang Jun 22, 2024
a67e257
async getting cluster id
rynewang Jun 23, 2024
39b9699
move check
rynewang Jun 24, 2024
babc1b1
special treat the timeout error
rynewang Jun 24, 2024
af202c6
make get cluster id blocking
rynewang Jun 24, 2024
50d66a1
remove the race condition
rynewang Jun 25, 2024
10b3f09
fix serve bad fixture
rynewang Jun 25, 2024
afcb26f
lint
rynewang Jun 25, 2024
515d001
Merge branch 'master' into new-gcs-client-sync
rynewang Jun 25, 2024
d25e756
add get-cluster-id-on-connect
rynewang Jun 26, 2024
a2e5d3d
move cluster_id to options
rynewang Jun 26, 2024
f90448b
fix
rynewang Jun 26, 2024
d313a27
Merge remote-tracking branch 'origin/master' into new-gcs-client-sync
rynewang Jun 26, 2024
b2a5c2f
fix mock
rynewang Jun 27, 2024
e738e60
only retry on UNAVAILABLE and dtor on error path
rynewang Jun 27, 2024
42d5d3c
fix all failures (finally)
rynewang Jun 27, 2024
4f59d4c
Merge branch 'master' into new-gcs-client-sync
rynewang Jun 27, 2024
e2a5c02
Merge remote-tracking branch 'origin/master' into new-gcs-client-sync
rynewang Jul 9, 2024
0520d29
fix pxi
rynewang Jul 9, 2024
1f241be
fix pxi again
rynewang Jul 9, 2024
c0f1399
add back missing error
rynewang Jul 9, 2024
47faef9
Merge remote-tracking branch 'origin/master' into new-gcs-client-sync
rynewang Jul 10, 2024
b942de3
Merge remote-tracking branch 'origin/master' into new-gcs-client-sync
rynewang Jul 10, 2024
d49ac12
fix merge churn
rynewang Jul 10, 2024
a9b87ab
cpp test
rynewang Jul 10, 2024
a063152
add shutdown checker, revert check_status order
rynewang Jul 16, 2024
4fe97cd
retry 2 codes in cpp, no gil in check_status_2
rynewang Jul 16, 2024
77bf28e
Update python/ray/includes/gcs_client.pxi
rynewang Jul 16, 2024
6d7511d
lint
rynewang Jul 16, 2024
dfcd027
cdef the connect, remove conn check
rynewang Jul 16, 2024
cc60b45
reword comments
rynewang Jul 16, 2024
e8f1ba1
change init connection timeout_ms
rynewang Jul 16, 2024
caffd79
Merge branch 'master' into new-gcs-client-sync
rynewang Jul 17, 2024
747c468
add 1 try to the "retry"
rynewang Jul 17, 2024
6197394
fix default timeout logic
rynewang Jul 17, 2024
f31e58a
add back the timout
rynewang Jul 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/ray/_private/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,8 @@ def _init_gcs_client(self):
else:
gcs_process = None

# TODO(ryw) instead of create a new GcsClient, wrap the one from
# CoreWorkerProcess to save a grpc channel.
for _ in range(ray_constants.NUM_REDIS_GET_RETRIES):
gcs_address = None
last_ex = None
Expand Down
1 change: 1 addition & 0 deletions python/ray/_private/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ def run_string_as_driver(driver_script: str, env: Dict = None, encode: str = "ut
output = proc.communicate(driver_script.encode(encoding=encode))[0]
if proc.returncode:
print(ray._private.utils.decode(output, encode_type=encode))
logger.error(proc.stderr)
raise subprocess.CalledProcessError(
proc.returncode, proc.args, output, proc.stderr
)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/_private/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def ensure_str(s, encoding="utf-8", errors="strict"):
if isinstance(s, str):
return s
else:
assert isinstance(s, bytes)
assert isinstance(s, bytes), f"Expected str or bytes, got {type(s)}"
return s.decode(encoding, errors)


Expand Down
86 changes: 41 additions & 45 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ include "includes/ray_config.pxi"
include "includes/function_descriptor.pxi"
include "includes/buffer.pxi"
include "includes/common.pxi"
include "includes/gcs_client.pxi"
include "includes/serialization.pxi"
include "includes/libcoreworker.pxi"
include "includes/global_state_accessor.pxi"
Expand Down Expand Up @@ -555,50 +556,6 @@ class ObjectRefGenerator:
# For backward compatibility.
StreamingObjectRefGenerator = ObjectRefGenerator

cdef int check_status(const CRayStatus& status) nogil except -1:
if status.ok():
return 0

with gil:
message = status.message().decode()

if status.IsObjectStoreFull():
raise ObjectStoreFullError(message)
elif status.IsInvalidArgument():
raise ValueError(message)
elif status.IsOutOfDisk():
raise OutOfDiskError(message)
elif status.IsObjectRefEndOfStream():
raise ObjectRefStreamEndOfStreamError(message)
elif status.IsInterrupted():
raise KeyboardInterrupt()
elif status.IsTimedOut():
raise GetTimeoutError(message)
elif status.IsNotFound():
raise ValueError(message)
elif status.IsObjectNotFound():
raise ValueError(message)
elif status.IsObjectUnknownOwner():
raise ValueError(message)
elif status.IsIOError():
raise IOError(message)
elif status.IsRpcError():
raise RpcError(message, rpc_code=status.rpc_code())
elif status.IsIntentionalSystemExit():
with gil:
raise_sys_exit_with_custom_error_message(message)
elif status.IsUnexpectedSystemExit():
with gil:
raise_sys_exit_with_custom_error_message(
message, exit_code=1)
elif status.IsChannelError():
raise RayChannelError(message)
elif status.IsChannelTimeoutError():
raise RayChannelTimeoutError(message)
else:
raise RaySystemError(message)


cdef c_bool is_plasma_object(shared_ptr[CRayObject] obj):
"""Return True if the given object is a plasma object."""
assert obj.get() != NULL
Expand Down Expand Up @@ -2714,7 +2671,46 @@ def _auto_reconnect(f):


cdef class GcsClient:
"""Cython wrapper class of C++ `ray::gcs::GcsClient`."""
"""
Client to the GCS server. Only contains synchronous methods.

This class is in transition to use the new C++ GcsClient binding. The old
PythonGcsClient binding is not deleted until we are confident that the new
binding is stable.

Defaults to the new binding. If you want to use the old binding, please
set the environment variable `RAY_USE_OLD_GCS_CLIENT=1`.
"""

cdef object inner # OldGcsClient or NewGcsClient
cdef c_bool use_old_client

def __cinit__(self, address,
nums_reconnect_retry=None,
cluster_id: str = None):
self.use_old_client = os.getenv("RAY_USE_OLD_GCS_CLIENT") == "1"
if self.use_old_client:
self.inner = OldGcsClient(address, nums_reconnect_retry, cluster_id)
else:
# nums_reconnect_retry is ignored because now we rely on GcsRpcClient
# retry.
# TODO: it does not support initial connection when GCS is down. We need to
# support it in GcsRpcClient.
rynewang marked this conversation as resolved.
Show resolved Hide resolved
self.inner = NewGcsClient.standalone(address, cluster_id)
logger.debug(f"Created GcsClient. inner {self.inner}")

def __getattr__(self, name):
if self.use_old_client:
return getattr(self.inner, name)
# For new client, we collect the frequency of each method call.
# For old client, that is done in @_auto_reconnect.
if "TEST_RAY_COLLECT_KV_FREQUENCY" in os.environ:
with ray._private.utils._CALLED_FREQ_LOCK:
ray._private.utils._CALLED_FREQ[name] += 1
rynewang marked this conversation as resolved.
Show resolved Hide resolved
return getattr(self.inner, name)

cdef class OldGcsClient:
"""Old Cython wrapper class of C++ `ray::gcs::PythonGcsClient`."""
cdef:
shared_ptr[CPythonGcsClient] inner
object address
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/tests/test_block_sizing.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,4 +258,4 @@ def test_shuffle(shutdown_only, restore_data_context, shuffle_op):
if __name__ == "__main__":
import sys

sys.exit(pytest.main(["-v", __file__]))
sys.exit(pytest.main(["-sv", __file__]))
135 changes: 132 additions & 3 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil:
pass

cdef cppclass CRayStatus "ray::Status":
RayStatus()
RayStatus(StatusCode code, const c_string &msg)
RayStatus(const CRayStatus &s)
CRayStatus()
CRayStatus(StatusCode code, const c_string &msg)
CRayStatus(StatusCode code, const c_string &msg, int rpc_code)
CRayStatus(const CRayStatus &s)

@staticmethod
CRayStatus OK()
Expand Down Expand Up @@ -375,6 +376,115 @@ cdef extern from "ray/core_worker/common.h" nogil:
const CNodeID &GetSpilledNodeID() const
const c_bool GetDidSpill() const

cdef extern from "ray/gcs/gcs_client/accessor.h" nogil:
cdef cppclass CActorInfoAccessor "ray::gcs::ActorInfoAccessor":
pass

cdef cppclass CJobInfoAccessor "ray::gcs::JobInfoAccessor":
CRayStatus GetAll(
c_vector[CJobTableData] &result,
int64_t timeout_ms)

cdef cppclass CNodeInfoAccessor "ray::gcs::NodeInfoAccessor":
CRayStatus CheckAlive(
const c_vector[c_string] &raylet_addresses,
int64_t timeout_ms,
c_vector[c_bool] &result)

CRayStatus DrainNodes(
const c_vector[CNodeID] &node_ids,
int64_t timeout_ms,
c_vector[c_string] &drained_node_ids)

CRayStatus GetAllNoCache(
int64_t timeout_ms,
c_vector[CGcsNodeInfo] &result)

cdef cppclass CNodeResourceInfoAccessor "ray::gcs::NodeResourceInfoAccessor":
CRayStatus GetAllResourceUsage(
int64_t timeout_ms,
CGetAllResourceUsageReply &serialized_reply)

cdef cppclass CInternalKVAccessor "ray::gcs::InternalKVAccessor":
CRayStatus Keys(
const c_string &ns,
const c_string &prefix,
int64_t timeout_ms,
c_vector[c_string] &value)

CRayStatus Put(
const c_string &ns,
const c_string &key,
const c_string &value,
c_bool overwrite,
int64_t timeout_ms,
c_bool &added)

CRayStatus Get(
const c_string &ns,
const c_string &key,
int64_t timeout_ms,
c_string &value)

CRayStatus MultiGet(
const c_string &ns,
const c_vector[c_string] &keys,
int64_t timeout_ms,
unordered_map[c_string, c_string] &values)

CRayStatus Del(
const c_string &ns,
const c_string &key,
c_bool del_by_prefix,
int64_t timeout_ms,
int& num_deleted)

CRayStatus Exists(
const c_string &ns,
const c_string &key,
int64_t timeout_ms,
c_bool &exists)

cdef cppclass CRuntimeEnvAccessor "ray::gcs::RuntimeEnvAccessor":
CRayStatus PinRuntimeEnvUri(
const c_string &uri,
int expiration_s,
int64_t timeout_ms)

cdef cppclass CAutoscalerStateAccessor "ray::gcs::AutoscalerStateAccessor":

CRayStatus RequestClusterResourceConstraint(
int64_t timeout_ms,
const c_vector[unordered_map[c_string, double]] &bundles,
const c_vector[int64_t] &count_array
)

CRayStatus GetClusterResourceState(
int64_t timeout_ms,
c_string &serialized_reply
)

CRayStatus GetClusterStatus(
int64_t timeout_ms,
c_string &serialized_reply
)

CRayStatus ReportAutoscalingState(
int64_t timeout_ms,
const c_string &serialized_state
)

CRayStatus DrainNode(
const c_string &node_id,
int32_t reason,
const c_string &reason_message,
int64_t deadline_timestamp_ms,
int64_t timeout_ms,
c_bool &is_accepted,
c_string &rejection_reason_message
)


cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil:
cdef enum CGrpcStatusCode "grpc::StatusCode":
UNAVAILABLE "grpc::StatusCode::UNAVAILABLE",
Expand All @@ -388,6 +498,22 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil:
const c_string &gcs_address, int port, CClusterID cluster_id,
c_bool allow_cluster_id_nil, c_bool fetch_cluster_id_if_nil)

cdef cppclass CGcsClient "ray::gcs::GcsClient":
CGcsClient(const CGcsClientOptions &options)

c_pair[c_string, int] GetGcsServerAddress() const
CClusterID GetClusterId() const

CActorInfoAccessor& Actors()
CJobInfoAccessor& Jobs()
CInternalKVAccessor& InternalKV()
CNodeInfoAccessor& Nodes()
CNodeResourceInfoAccessor& NodeResources()
CRuntimeEnvAccessor& RuntimeEnvs()
CAutoscalerStateAccessor& Autoscaler()

CRayStatus ConnectOnSingletonIoContext(CGcsClient &gcs_client)

cdef cppclass CPythonGcsClient "ray::gcs::PythonGcsClient":
CPythonGcsClient(const CGcsClientOptions &options)

Expand Down Expand Up @@ -540,6 +666,9 @@ cdef extern from "src/ray/protobuf/gcs.pb.h" nogil:
CJobConfig config() const
const c_string &SerializeAsString() const

cdef cppclass CGetAllResourceUsageReply "ray::rpc::GetAllResourceUsageReply":
const c_string& SerializeAsString() const

cdef cppclass CPythonFunction "ray::rpc::PythonFunction":
void set_key(const c_string &key)
c_string key() const
Expand Down
69 changes: 69 additions & 0 deletions python/ray/includes/common.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,22 @@ from ray.includes.common cimport (
kStreamingGeneratorReturn,
)

from ray.exceptions import (
RayActorError,
ActorDiedError,
RayError,
RaySystemError,
RayTaskError,
ObjectStoreFullError,
OutOfDiskError,
GetTimeoutError,
TaskCancelledError,
AsyncioActorExit,
PendingCallsLimitExceeded,
RpcError,
ObjectRefStreamEndOfStreamError,
)


cdef class GcsClientOptions:
"""Cython wrapper class of C++ `ray::gcs::GcsClientOptions`."""
Expand Down Expand Up @@ -44,6 +60,59 @@ cdef class GcsClientOptions:
cdef CGcsClientOptions* native(self):
return <CGcsClientOptions*>(self.inner.get())

cdef int check_status_timeout_as_rpc_error(const CRayStatus& status) nogil except -1:
if status.ok():
return 0

with gil:
message = status.message().decode()

if status.IsObjectStoreFull():
raise ObjectStoreFullError(message)
elif status.IsInvalidArgument():
raise ValueError(message)
elif status.IsOutOfDisk():
raise OutOfDiskError(message)
elif status.IsObjectRefEndOfStream():
raise ObjectRefStreamEndOfStreamError(message)
elif status.IsInterrupted():
raise KeyboardInterrupt()
elif status.IsTimedOut():
raise RpcError(message, rpc_code=CGrpcStatusCode.DEADLINE_EXCEEDED)
elif status.IsNotFound():
raise ValueError(message)
elif status.IsObjectNotFound():
raise ValueError(message)
elif status.IsObjectUnknownOwner():
raise ValueError(message)
elif status.IsIOError():
raise IOError(message)
elif status.IsRpcError():
raise RpcError(message, rpc_code=status.rpc_code())
elif status.IsIntentionalSystemExit():
with gil:
raise_sys_exit_with_custom_error_message(message)
elif status.IsUnexpectedSystemExit():
with gil:
raise_sys_exit_with_custom_error_message(
message, exit_code=1)
elif status.IsChannelError():
raise RayChannelError(message)
elif status.IsChannelTimeoutError():
raise RayChannelTimeoutError(message)
else:
raise RaySystemError(message)


cdef int check_status(const CRayStatus& status) nogil except -1:
with gil:
rynewang marked this conversation as resolved.
Show resolved Hide resolved
try:
return check_status_timeout_as_rpc_error(status)
except RpcError as e:
if e.rpc_code == CGrpcStatusCode.DEADLINE_EXCEEDED:
raise GetTimeoutError(e.message)
else:
raise e

WORKER_PROCESS_SETUP_HOOK_KEY_NAME_GCS = str(kWorkerSetupHookKeyName)
RESOURCE_UNIT_SCALING = kResourceUnitScaling
Expand Down
Loading