Skip to content

Commit f229d86

Browse files
authored
[core] Fix incorrect usage of grpc streaming API in ray syncer (#58307)
There was a video object detection Ray Data workload hang reported. An initial investigation by @jjyao and @dayshah observed that it was due to an actor restart and the actor creation task was being spilled to a raylet that had an outdated resource view. This was found by looking at the raylet state dump. This actor creation task required 1 GPU and 1 CPU, and the raylet where this actor creation task was being spilled to had a cluster view that reported no available GPUs. However there were many available GPUs, and all the other raylet state dumps correctly reported this. Furthermore in the raylet logs for the oudated raylet there was a "Failed to send a message to node: " originating from the ray syncer. Hence an initial hypothesis was formed that the ray syncer retry policy was not working as intended. A follow up investigation by @edoakes and I revealed an incorrect usage of the grpc streaming callback API. Currently how retries works in the ray syncer on fail to send/write is: - OnWriteDone/OnReadDone(ok = false) is called after a failed read/write - Disconnect() (the one in *_bidi_reactor.h!) is called which flips _disconnected to true and calls DoDisconnect() - DoDisconnect() notifies grpc we will no longer write to the channel via StartWritesDone() and removes the hold via RemoveHold() - GRPC will see that the channel is idle and has no hold so will call OnDone() - we've overriden OnDone() to hold a cleanup_cb that contains the retry policy that reinitializes the bidi reactor and connects to the same server at a repeated interval of 2 seconds until it succeeds - fault tolerance accomplished! :) However from logs that we added we weren't seeing OnDone() being called after DoDisconnect() happens. From reading the grpc streaming callback best practices here: https://grpc.io/docs/languages/cpp/best_practices/#callback-streaming-api it states that "The best practice is always to read until ok=false on the client side" From the OnDone grpc documentation: https://grpc.github.io/grpc/cpp/classgrpc_1_1_client_bidi_reactor.html#a51529f76deeda6416ce346291577ffa9: it states that "Notifies the application that all operations associated with this RPC have completed and all Holds have been removed" Since we call StartWritesDone() and removed the hold, this should notify grpc that all operations associated with this bidi reactor are completed. HOWEVER reads may not be finished, i.e. we have not read all incoming data. Consider the following scenario: 1.) We receive a bunch of resource view messages from the GCS and have not processed all of them 2.) OnWriteDone(ok = false) is called => Disconnected() => disconnected_ = false 3.) OnReadDone(ok = true) is called however because disconnected_ = true we early return and STOP processing any more reads as shown below: https://github.com/ray-project/ray/blob/275a585203bef4e48c04b46b2b7778bd8265cf46/src/ray/ray_syncer/ray_syncer_bidi_reactor_base.h#L178-L180 4.) Pending reads left in queue, and prevent grpc from calling OnDone since not all operations are done 5.) Hang, we're left in a zombie state and drop all incoming resource view messages and don't send any resource view updates due to the disconnected check Hence the solution is to remove the disconnected check in OnReadDone and simply allow all incoming data to be read. There's a couple of interesting observations/questions remaining: 1.) The raylet with the outdated view is the local raylet to the gcs and we're seeing read/write errors despite being on the same node 2.) From the logs I see that the gcs syncer thinks that the channel to the raylet syncer is still available. There's no error logs on the gcs side, its still sending messages to the raylet. Hence even though the raylet gets the "Failed to write error: " we don't see a corresponding error log on the GCS side. --------- Signed-off-by: joshlee <joshlee@anyscale.com>
1 parent 9fd7d31 commit f229d86

File tree

2 files changed

+15
-16
lines changed

2 files changed

+15
-16
lines changed

src/ray/ray_syncer/ray_syncer_bidi_reactor.h

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,14 @@ using ray::rpc::syncer::ResourceViewSyncMessage;
5656
///
5757
///
5858
/// For the client side:
59-
/// +------------+ +-------------+ +------------+ gRPC error or disconnected +--------+
60-
/// | StartCall | ---> | StartRead | <---> | OnReadDone | ----------------------------> | OnDone |
61-
/// +------------+ +-------------+ +------------+ +--------+
62-
/// | ^
63-
/// | |
64-
/// v |
65-
/// +------------+ +-------------+ gRPC error or disconnected |
66-
/// | StartWrite | <--> | OnWriteDone | -------------------------------------------------------+
59+
/// +------------+ +-------------+ +------------+ gRPC error or ALL incoming data read +--------+
60+
/// | StartCall | ---> | StartRead | <---> | OnReadDone | --------------------------------------> | OnDone |
61+
/// +------------+ +-------------+ +------------+ +--------+
62+
/// | ^
63+
/// | |
64+
/// v |
65+
/// +------------+ +-------------+ gRPC error or disconnected |
66+
/// | StartWrite | <--> | OnWriteDone | ------------------------------------------------------------------+
6767
/// +------------+ +-------------+
6868
// clang-format on
6969
class RaySyncerBidiReactor {

src/ray/ray_syncer/ray_syncer_bidi_reactor_base.h

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -171,14 +171,13 @@ class RaySyncerBidiReactorBase : public RaySyncerBidiReactor, public T {
171171

172172
void OnReadDone(bool ok) override {
173173
io_context_.dispatch(
174-
[this,
175-
ok,
176-
disconnected = IsDisconnected(),
177-
msg = std::move(receiving_message_)]() mutable {
178-
if (*disconnected) {
179-
return;
180-
}
181-
174+
[this, ok, msg = std::move(receiving_message_)]() mutable {
175+
// NOTE: According to the grpc callback streaming api best practices 3.)
176+
// https://grpc.io/docs/languages/cpp/best_practices/#callback-streaming-api
177+
// The client must read all incoming data i.e. until OnReadDone(ok = false)
178+
// happens for OnDone to be called. Hence even if disconnected_ is true, we
179+
// still need to allow OnReadDone to repeatedly execute until StartReadData has
180+
// consumed all the data for OnDone to be called.
182181
if (!ok) {
183182
RAY_LOG_EVERY_MS(INFO, 1000) << "Failed to read a message from node: "
184183
<< NodeID::FromBinary(GetRemoteNodeID());

0 commit comments

Comments
 (0)