-
Notifications
You must be signed in to change notification settings - Fork 7k
[Core] Fix the issue where multiple multithreaded calls to ray.get may cause hanging. #54495
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @Catch-Bull, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request delivers a crucial fix for ray.get operations in multi-threaded Ray applications. By introducing unique request identifiers for each ray.get call and updating the underlying communication and dependency management mechanisms, it ensures that concurrent object fetching requests within a single worker are properly isolated. This enhancement significantly improves the stability and reliability of Ray when dealing with multi-threaded workloads, preventing previously observed hanging issues.
Highlights
- Problem Addressed: This pull request fixes a critical issue where
ray.getoperations in multi-threaded environments could lead to hangs. Previously, a worker'sray.getstate was globally managed, causing oneray.getcall to inadvertently cancel the progress of another concurrentray.getoperation within the same worker. - Core Solution: Per-Request IDs: The fundamental change introduces a unique
request_idfor eachray.getoperation. This allows the Raylet's dependency manager to track and manage multiple concurrentray.getrequests from a single worker independently, preventing interference. - Communication Protocol Enhancements: The Raylet communication protocol (
node_manager.fbs) has been updated. Messages likeFetchOrReconstruct,NotifyUnblocked, andNotifyDirectCallTaskUnblockednow include this newrequest_id, enabling fine-grained control over object fetching and cancellation at the Raylet level. - Dependency Management Refinement: The
DependencyManagerin the Raylet has been significantly refactored. It now maintains a map of activeget_requestsper worker, keyed by theirrequest_id, and tracks object dependencies with a reference count per worker, ensuring correct management of concurrentray.getcalls. - New Test Case: A new comprehensive test,
test_ray_actor_get_in_multiple_threads, has been added. This test specifically validates the fix in a multi-threaded actor environment, simulating the problematic scenario of a long-runningray.getbeing interrupted by a short one.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.
| Feature | Command | Description |
|---|---|---|
| Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
| Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
| Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a solution to a ray.get hanging issue in multi-threaded environments by associating each get request with a unique ID. The changes are comprehensive, spanning from the core worker and raylet to the underlying RPC protocol, and are accompanied by a targeted test case that effectively reproduces the original problem. A potential null pointer dereference has been identified and needs to be addressed.
src/ray/raylet/node_manager.cc
Outdated
| const TaskID ¤t_task_id, | ||
| bool ray_get) { | ||
| bool ray_get, | ||
| uint64_t *get_request_id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The get_request_id parameter can be a nullptr if AsyncResolveObjects is called with its default value. However, dependency_manager_.StartOrUpdateGetRequest dereferences this pointer without a null check, which will lead to a crash if ray_get is true. While the current call sites seem to avoid this condition (e.g., ray_get is false when get_request_id is null), this is fragile and should be made more robust. Please add a check to ensure get_request_id is not null when ray_get is true.
| uint64_t *get_request_id) { | |
| if (ray_get) { | |
| RAY_CHECK(get_request_id) << "get_request_id must be provided for ray.get()"; | |
| dependency_manager_.StartOrUpdateGetRequest( | |
| worker->WorkerId(), required_object_refs, get_request_id); | |
| } else { | |
| dependency_manager_.StartOrUpdateWaitRequest(worker->WorkerId(), | |
| required_object_refs); |
ray.get in a multi-threaded environment.|
@dayshah @israbbani could you review this one? |
jjyao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on the fix
src/ray/raylet/dependency_manager.cc
Outdated
| } | ||
| get_request.second = new_request_id; | ||
|
|
||
| if (*request_id != new_request_id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when can they be the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every time a GET request is sent, the request_id passed in will be 0 (since the type of this request id is uint64_t, which cannot be null, so 0 is used to represent null), and the ID of the first pull request will also be 0.
I suddenly realized that there is an issue here. If a worker sends two GET requests simultaneously, the operations on the request with ID 0 would be unsafe. Therefore, it should be required that the pull request ID starts from 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My mistake. I thought the request id started from 0.
| /// should be used to cancel the pull request in the object manager once the | ||
| /// worker cancels the `ray.get` request. | ||
| absl::flat_hash_map<WorkerID, std::pair<absl::flat_hash_set<ObjectID>, uint64_t>> | ||
| absl::flat_hash_map<WorkerID, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. Needs to update the comment above to reflect the latest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
| /// \param required_objects The objects required by the worker. | ||
| /// \param request_id The request ID. | ||
| /// \return Void. | ||
| void StartOrUpdateGetRequest(const WorkerID &worker_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When will we update an existing get request after this PR, shouldn't we also start a new pull with a new id?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In our current implementation of the GET request, a single GET request is divided into k batches and send k FetchRequest to local raylet. Therefore, this update is actually a merge in terms of semantics.
|
@Catch-Bull can you help me understand the exact semantics of the Previously, this concept was not exposed to the worker at all. We are adding a lot of complexity and possibility for new bugs by adding it. This might be warranted, but I would like to see if there's an alternative solution that avoids this dependency if possible. |
@edoakes I think my PR doesn't change the semantics of the request ID (I think what we are discussing here is the get request ID). In fact, both the wait request ID and the get request ID are pull request IDs returned by the pull manager. The only difference is whether the pull request is triggered by Before this PR, a core worker would only have one active get request ID. After the PR, each unfinished ray.get will have its own active get request ID. In this way, ray.get calls in different threads will no longer interfere with each other. The reason why the worker must hold the request ID is due to the current implementation of CoreWorkerPlasmaStoreProvider::Get. It does not synchronously wait for the completion of all get requests. Instead, it divides the requests into several batches, each time waiting for a shorter period of time (batch_timeout). This means it needs to send multiple get requests to try to modify the get request. In order to locate the corresponding get request, it must hold the request ID. |
|
Thanks for the explanation, it's really helpful. Let me do a little bit of digging through the code to build up my context and see if there are any alternative ways to simplify the request ID logic in general. |
|
Haven't forgotten about this, just buried in the queue |
edoakes
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR basically LGTM, stylistic comments
| /// No get request needs to be canceled. | ||
| RAY_RETURN_NOT_OK(raylet_client_->NotifyDirectCallTaskUnblocked( | ||
| /*get_request_id=*/std::numeric_limits<uint64_t>::max())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's give the get_request_id argument a default value that corresponds to not canceling any get request
| if (should_notify_raylet) { | ||
| RAY_CHECK_OK(raylet_client_->NotifyDirectCallTaskUnblocked()); | ||
| /// No get request needs to be canceled. | ||
| RAY_RETURN_NOT_OK(raylet_client_->NotifyDirectCallTaskUnblocked( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we should do some cleanup of naming around these RPC methods. This should be called something like AcquireResourcesAndCancelGetRequest.
I'll take a stab at cleaning it up after this PR
| absl::flat_hash_set<uint64_t> get_request_ids; | ||
| if (message->get_request_id() < std::numeric_limits<uint64_t>::max()) { | ||
| get_request_ids.insert(message->get_request_id()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this a set given the size always seems to be 0 or 1?
| auto message = flatbuffers::GetRoot<protocol::NotifyUnblocked>(message_data); | ||
| AsyncResolveObjectsFinish(client, from_flatbuf<TaskID>(*message->task_id())); | ||
| absl::flat_hash_set<uint64_t> get_request_ids; | ||
| if (message->get_request_id() < std::numeric_limits<uint64_t>::max()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO it'd be more natural to use 0 as the indicator for no get request and ensure that 0 is never generated as a get request ID
| ], | ||
| indirect=True, | ||
| ) | ||
| def test_ray_actor_get_in_multiple_threads(ray_start_cluster_head): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test doesn't belong in test_threaded_actor.py. it's not actually even using a threaded actor at the moment, it's starting its own thread instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can introduce a new file called test_api_thread_safety.py so we can centralize similar fixes in the future there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, I believe the bug is not specific to usage from an actor. this can also happen from a driver. should we test from both?
| # put into worker plasma. Ensure that each get returns immediately. | ||
| # Before this fix, this get would repeatedly clear the pull progress. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is also a more deterministic way to write this test, roughly:
- Have 3 nodes (head, worker 1, worker 2).
- Run a task on worker 1 to create object 1, run a task on worker 2 to create object 2.
ray.wait(fetch_local=False)for both objects to be created- Remove node 1, which will lose the primary copy of object 1
- Call ray.get(object_1) in thread 1
- Call ray.get(object_2) in thread 2, let it complete
- Create a new version of node 1, which will allow object 1 to be reconstructed. This would hang without your change and should work with it.
|
@Catch-Bull I am going to work on cleaning up some of the naming around this area, first PR here: #55081 Will cause merge conflicts here, lmk if you want me to wait (depending on how long you think it'll take until you can address review) |
|
|
@edoakes I'm ooo recently, perhaps you could merge your PR it first. I'll revise it according to your comments when I'm back at work next week, then rebase master. Sounds good. If it gets gnarly I can help fix the conflicts too. |
The `NotifyUnblocked` naming is legacy from before Ray 1.0. Also removed `task_id` from various places that we didn't need it. Note there is an ongoing bugfix to cancel only the specific get request instead of all requests for the worker: #54495 --------- Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
The `NotifyUnblocked` naming is legacy from before Ray 1.0. Also removed `task_id` from various places that we didn't need it. Note there is an ongoing bugfix to cancel only the specific get request instead of all requests for the worker: #54495 --------- Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com> Signed-off-by: Kamil Kaczmarek <kamil@anyscale.com>
) The `NotifyUnblocked` naming is legacy from before Ray 1.0. Also removed `task_id` from various places that we didn't need it. Note there is an ongoing bugfix to cancel only the specific get request instead of all requests for the worker: ray-project#54495 --------- Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com> Signed-off-by: Michael Acar <michael.j.acar@gmail.com>
The `NotifyUnblocked` naming is legacy from before Ray 1.0. Also removed `task_id` from various places that we didn't need it. Note there is an ongoing bugfix to cancel only the specific get request instead of all requests for the worker: #54495 --------- Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
The `NotifyUnblocked` naming is legacy from before Ray 1.0. Also removed `task_id` from various places that we didn't need it. Note there is an ongoing bugfix to cancel only the specific get request instead of all requests for the worker: #54495 --------- Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com> Signed-off-by: sampan <sampan@anyscale.com>
|
This pull request has been automatically marked as stale because it has not had You can always ask for help on our discussion forum or Ray's public slack channel. If you'd like to keep this open, just leave any comment, and the stale label will be removed. |
|
@Catch-Bull just checking in, are you still planning to pick this back up? |
33a9c81 to
5284b83
Compare
@edoakes Sorry, I've been a bit busy lately, so this PR got delayed. I've already rebased it. Could you take another look? |
5284b83 to
2c206a0
Compare
|
@israbbani is going to take over reviewing (there are a few other related issues he's working on too) |
|
nevermind, it was my bad |
) The `NotifyUnblocked` naming is legacy from before Ray 1.0. Also removed `task_id` from various places that we didn't need it. Note there is an ongoing bugfix to cancel only the specific get request instead of all requests for the worker: ray-project#54495 --------- Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com> Signed-off-by: jugalshah291 <shah.jugal291@gmail.com>
|
@Catch-Bull I'll pick this up this week. I'll let you know once I've read through the description and the PR. |
|
@Catch-Bull I'm ready to review this. It looks like there are merge conflicts. Can you merge master into the branch and we can kick off CI? |
) The `NotifyUnblocked` naming is legacy from before Ray 1.0. Also removed `task_id` from various places that we didn't need it. Note there is an ongoing bugfix to cancel only the specific get request instead of all requests for the worker: ray-project#54495 --------- Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com> Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
|
replaced by: #57911 |
Why are these changes needed?
In the current implementation, a worker process corresponds to only one get request ID. When any thread in the worker completes its get operation, it clears the progress of all get requests in the entire worker. This can easily lead to traffic waste in a multi-threaded environment and may even cause the following job models to hang:
Related issue number
Closes #54007
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.