-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ray_client] close ray connection upon client deactivation #13919
[ray_client] close ray connection upon client deactivation #13919
Conversation
Signed-off-by: Richard Liaw <rliaw@berkeley.edu>
Signed-off-by: Richard Liaw <rliaw@berkeley.edu>
python/ray/util/client/worker.py
Outdated
break | ||
# Ray is not ready yet, wait a timeout | ||
time.sleep(timeout) | ||
# # Now the HTTP2 channel is ready, or proxied, but the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm what problem was this causing? Shouldn't is_initialized still work as before after the client connects?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see, the data connection isn't added until later. How about we add a new dummy ping operation here then instead of is_initialized()? It can be anything really, is_initialized was just used as a convenient no_op.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is_initialized()
also had the advantage of checking if ray was initialized, which is kinda useful here. What you might want to do though is do the data connection earlier. you might need to wait for /it/ to return ready (meaning the send/receive loop thread pair is kicked off and sets a flag, or even better, returns its client_id) -- but in general, yeah, some op here is useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK i did something, take a look! happy to hear any suggestions
Great start! Comments inline.
Yeah this is probably fine for now, if we consider the common use case here a cluster to have one or zero clients. There might be extra clients attached just for debugging purposes. |
address=args.redis_address, | ||
_redis_password=args.redis_password) | ||
|
||
def ray_connect_handler(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe make this top-level and pass in redis_address/redis_password?
python/ray/util/client/worker.py
Outdated
break | ||
# Ray is not ready yet, wait a timeout | ||
time.sleep(timeout) | ||
# # Now the HTTP2 channel is ready, or proxied, but the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is_initialized()
also had the advantage of checking if ray was initialized, which is kinda useful here. What you might want to do though is do the data connection earlier. you might need to wait for /it/ to return ready (meaning the send/receive loop thread pair is kicked off and sets a flag, or even better, returns its client_id) -- but in general, yeah, some op here is useful.
src/ray/protobuf/ray_client.proto
Outdated
@@ -139,6 +139,7 @@ message ClusterInfoType { | |||
CLUSTER_RESOURCES = 2; | |||
AVAILABLE_RESOURCES = 3; | |||
RUNTIME_CONTEXT = 4; | |||
IS_ALIVE = 5; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if a protocol change is strictly necessary -- we know it's alive, for instance, if the handshake from the data channel completes (which is what you'd want to test anyway)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah agreed; I'm ok with this though as an explicit check since it seems straightforward.
This reverts commit 0410c37.
Signed-off-by: Richard Liaw <rliaw@berkeley.edu>
ray.disconnect() | ||
finally: | ||
ray_client_server.shutdown_with_server(server_handle.grpc_server) | ||
time.sleep(2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just moved this stuff into the fixture
python/ray/util/client/worker.py
Outdated
raise ConnectionError("ray client connection timeout") | ||
|
||
# Initialize the streams to finish protocol negotiation. | ||
self.data_client = DataClient(self.channel, self._client_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@barakmich I just moved this into the try-except
python/ray/util/client/worker.py
Outdated
# Ray is not ready yet, wait a timeout | ||
time.sleep(timeout) | ||
# Initialize the streams to finish protocol negotiation. | ||
self.data_client = DataClient(self.channel, self._client_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The constructor of dataclient doesn't look like it actually does anything, so this won't suffice as a health check. How about we just add a dummy ping RPC instead and preserve the original code structure here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dumb question - what's the easiest way to implement a ping here? Should I modify the protobuf struct to have a dummy value that I will handle on the server side?
Probably you can follow the existing is_initialized code as an example and
copy it to return "ok".
…On Fri, Feb 5, 2021, 12:29 AM Richard Liaw ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In python/ray/util/client/worker.py
<#13919 (comment)>:
> @@ -94,16 +94,11 @@ def __init__(self,
# RayletDriverStub, allowing for unary requests.
self.server = ray_client_pb2_grpc.RayletDriverStub(
self.channel)
- # Now the HTTP2 channel is ready, or proxied, but the
- # servicer may not be ready. Call is_initialized() and if
- # it throws, the servicer is not ready. On success, the
- # `ray_ready` result is checked.
- ray_ready = self.is_initialized()
- if ray_ready:
- # Ray is ready! Break out of the retry loop
- break
- # Ray is not ready yet, wait a timeout
- time.sleep(timeout)
+ # Initialize the streams to finish protocol negotiation.
+ self.data_client = DataClient(self.channel, self._client_id,
Dumb question - what's the easiest way to implement a ping here? Should I
modify the protobuf struct to have a dummy value?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#13919 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAADUSXNZYNITJHTJZWWCIDS5OT7RANCNFSM4XDYT2SQ>
.
|
OK, ptal, i tried something
…On Fri, Feb 5, 2021 at 12:48 AM Eric Liang ***@***.***> wrote:
Probably you can follow the existing is_initialized code as an example and
copy it to return "ok".
On Fri, Feb 5, 2021, 12:29 AM Richard Liaw ***@***.***>
wrote:
> ***@***.**** commented on this pull request.
> ------------------------------
>
> In python/ray/util/client/worker.py
> <#13919 (comment)>:
>
> > @@ -94,16 +94,11 @@ def __init__(self,
> # RayletDriverStub, allowing for unary requests.
> self.server = ray_client_pb2_grpc.RayletDriverStub(
> self.channel)
> - # Now the HTTP2 channel is ready, or proxied, but the
> - # servicer may not be ready. Call is_initialized() and if
> - # it throws, the servicer is not ready. On success, the
> - # `ray_ready` result is checked.
> - ray_ready = self.is_initialized()
> - if ray_ready:
> - # Ray is ready! Break out of the retry loop
> - break
> - # Ray is not ready yet, wait a timeout
> - time.sleep(timeout)
> + # Initialize the streams to finish protocol negotiation.
> + self.data_client = DataClient(self.channel, self._client_id,
>
> Dumb question - what's the easiest way to implement a ping here? Should I
> modify the protobuf struct to have a dummy value?
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <#13919 (comment)>,
or
> unsubscribe
> <
https://github.com/notifications/unsubscribe-auth/AAADUSXNZYNITJHTJZWWCIDS5OT7RANCNFSM4XDYT2SQ
>
> .
>
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#13919 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ABCRZZNEY7NUSSNOQAV7CYLS5OWFLANCNFSM4XDYT2SQ>
.
|
Test client failing |
Signed-off-by: Richard Liaw <rliaw@berkeley.edu>
…ay-project#13919)" This reverts commit 8787c70.
Why are these changes needed?
Disconnects from Ray when num_clients drops to 0. This kills actors/tasks when job disconnects.
However, I think this doesn't really work if you have multiple clients.
Questions:
I disabled a check for ray initialization upon client connection. Is this safe? (probably not?)FixedShould we create a new protobuf type for "servicer alive" (as opposed to servicer alive == ray connected)?What's the right way to write a test here?DoneRelated issue number
Closes #13354
Checks
scripts/format.sh
to lint the changes in this PR.