-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] The New GcsClient binding #46186
Conversation
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
@@ -241,7 +241,7 @@ class GcsRpcClient { | |||
callback(status, reply); | |||
} | |||
delete executor; | |||
} else if (!status.IsRpcError()) { | |||
} else if (!IsGrpcRetryableStatus(status)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GCS can return non-OK and non-retryable errors. Excerpt from the PR initial comment:
- GcsClient now only retry grpc UNAVAILABLE and not other codes, since we only want to retry on GCS down and not other cases (e.g. RESOURCE_EXHAUSTED). See python/ray/tune/tests/test_tune_restore.py::ResourceExhaustedTest::test_resource_exhausted_info
In this example, Ray Train made an extra large message to GCS, which exceeds gRPC limit. It can't possibly be sent and we should not retry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
except RpcError as e:
if e.rpc_code in [
GRPC_STATUS_CODE_UNAVAILABLE,
GRPC_STATUS_CODE_UNKNOWN,
]:
if remaining_retry <= 0:
Python gcs client is retrying these two. Let's do the same for now.
Let's also double check the thread safety as we discussed. |
A comment on thread safety: GcsClient is thread safe when it's For Python bindings however, this won't happen, because we don't expose Disconnect() at all - it's only called when the last reference to the Python binding expires, and hence when the C++ GcsClient is destructed. At that time it's not possible to have a concurrent RPC call happening. It is possible, though, for Python to call Connect() a second time. To guard against that I added a return-if-already-connected check in Note, in C++ it's still possible to have race conditions because one can call Disconnect(). An example being GlobalStateAccessor(), who does it own mutex. |
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lg
@@ -241,7 +241,7 @@ class GcsRpcClient { | |||
callback(status, reply); | |||
} | |||
delete executor; | |||
} else if (!status.IsRpcError()) { | |||
} else if (!IsGrpcRetryableStatus(status)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
except RpcError as e:
if e.rpc_code in [
GRPC_STATUS_CODE_UNAVAILABLE,
GRPC_STATUS_CODE_UNKNOWN,
]:
if remaining_retry <= 0:
Python gcs client is retrying these two. Let's do the same for now.
How come? I thought |
yeah, I meant we do have the binding of Connect() exposed to Python side. Nobody is really calling it second time. |
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Co-authored-by: Jiajun Yao <jeromeyjj@gmail.com> Signed-off-by: Ruiyang Wang <56065503+rynewang@users.noreply.github.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Creates a direct Cython binding for
ray::gcs::GcsClient
and replaces the existingPythonGcsClient
binding. The new binding is enabled by default; one can switch back withRAY_USE_OLD_GCS_CLIENT=1
.The new binding is in its own file
gcs_client.pxi
included by_raylet.pyx
.This PR is a proof of concept that, we can do bindings for Async C++ APIs. Specifically, we can wrap a C++
ray::gcs::GcsClient
Async APIs (by callbacks) into Python Async APIs (by async/await).Why?
On Python gRPC, we now have a very complicated and inconsistent way. We have:
grpcio
based Client.grpcio.aio
based Client and Servers,GcsClient
-> C++PythonGcsClient
-> C++grpc::Channel
,GcsAioClient
-> thread pool executor -> PythonGcsClient
-> C++PythonGcsClient
-> C++grpc::Channel
,Gcs.*Subscriber
(sync)GcsAio.*Subscriber
(async)All of them talking to the GCS with more or less similar but subtly different APIs. This introduces maintenance overhead, makes debugging harder, and makes it harder to add new features.
Beyond Python, all these APIs are also having slightly different semantics than the C++ GcsClient itself as used by core_worker C++ code. For example,
@_auto_reconnect
to APIs. This applies to Python GcsClient and GcsAioClient, but not to C++ GcsClient or the Python subscribers. If we tweaked retry counts to "happen to work", it only works for the python code but not core worker code.PythonGcsClient::Connect
we retry several times, each time recreating aGcsChannel
. This is supposed to "make reconnection" faster by not waiting in the grpc-internal backoff. But this is not applied in C++ GcsClient or the Python subscribers. In fact, in C++ GcsClient, we don't manage the channel at all. We use the Ray-wide GcsRpcClient to manage it. Indeed, if we wanna "fast recreate" channels, we may want it to be consistenly applied to all clients.self._gcs_node_info_stub.GetAllNodeInfo
call in node_head.py, because they want the full reply whereas the Python GcsClient method only returns partial data the original caller wanted.What's blocking us?
Async. Cython is not known to be good at binding async APIs. We have a few challenges:
What's in this PR?
This PR introduces a NewGcsClient for all sync methods for the GcsClient, and a flag to switch back to the old PythonGcsClient implementation. The NewGcsClient is used by default; it assumes GCS is HA and always try to reconnect if the connection breaks, up until
gcs_rpc_server_reconnect_timeout_s
(default = 60s) and then exits. Python side@auto_reconnect
retries are removed, though the retry in node.py initialization (_init_gcs_client) is left for next PRs.GcsAioClient is left intact, it now uses thread pool executor to delegate calls to the NewGcsClient. In later PR we will remove it for good and use real async calls (See #45289).
Detailed Changes:
python/ray/tune/tests/test_tune_restore.py::ResourceExhaustedTest::test_resource_exhausted_info