Skip to content

Commit

Permalink
[Job API] Handle multiple drivers with same job submission id in GCS …
Browse files Browse the repository at this point in the history
…GetAllJobInfo endpoint (#32388) (#32426)

The changes to the GetAllJobInfo endpoint in #31046 did not handle the possibility that multiple job table jobs (drivers) could have the same submission_id. This can actually happen, for example if there are multiple ray.init() calls in a Ray Job API entrypoint command. The GCS would crash in this case due to failing a RAY_CHECK that the number of jobs equaled the number of submission_ids seen.

This PR updates the endpoint to handle the above possibility, and adds a unit test which fails without this PR.

Related issue number
Closes #32213

Co-authored-by: Archit Kulkarni <architkulkarni@users.noreply.github.com>
  • Loading branch information
cadedaniel and architkulkarni authored Feb 10, 2023
1 parent f5ac58a commit 08b2fa2
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 10 deletions.
22 changes: 12 additions & 10 deletions src/ray/gcs/gcs_server/gcs_job_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,10 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request,
// Internal KV keys for jobs that were submitted via the Ray Job API.
std::vector<std::string> job_api_data_keys;

// Maps job data keys to the index of the job in the table.
std::unordered_map<std::string, int> job_data_key_to_index;
// Maps a Job API data key to the indices of the corresponding jobs in the table. Note
// that multiple jobs can come from the same Ray Job API submission (e.g. if the
// entrypoint script calls ray.init() multiple times).
std::unordered_map<std::string, std::vector<int>> job_data_key_to_indices;

// Load the job table data into the reply.
int i = 0;
Expand All @@ -183,15 +185,14 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request,
std::string job_submission_id = iter->second;
std::string job_data_key = JobDataKey(job_submission_id);
job_api_data_keys.push_back(job_data_key);
job_data_key_to_index[job_data_key] = i;
job_data_key_to_indices[job_data_key].push_back(i);
}
i++;
}

RAY_CHECK(job_api_data_keys.size() == job_data_key_to_index.size());

// Load the JobInfo for jobs submitted via the Ray Job API.
auto kv_multi_get_callback =
[reply, send_reply_callback, job_data_key_to_index](
[reply, send_reply_callback, job_data_key_to_indices](
std::unordered_map<std::string, std::string> result) {
for (auto &data : result) {
std::string job_data_key = data.first;
Expand All @@ -207,10 +208,11 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request,
<< "Failed to parse JobInfo JSON into JobsAPIInfo protobuf. JSON: "
<< job_info_json << " Error: " << status.message();
}
// Add the JobInfo to the correct index in the reply.
reply->mutable_job_info_list(job_data_key_to_index.at(job_data_key))
->mutable_job_info()
->CopyFrom(std::move(jobs_api_info));
// Add the JobInfo to the correct indices in the reply.
for (int i : job_data_key_to_indices.at(job_data_key)) {
reply->mutable_job_info_list(i)->mutable_job_info()->CopyFrom(
std::move(jobs_api_info));
}
}
}
RAY_LOG(INFO) << "Finished getting all job info.";
Expand Down
29 changes: 29 additions & 0 deletions src/ray/gcs/gcs_server/test/gcs_job_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,35 @@ TEST_F(GcsJobManagerTest, TestGetAllJobInfo) {

// Make sure the GCS didn't hang or crash.
all_job_info_promise3.get_future().get();

// Add another job with the *same* submission ID. This can happen if the entrypoint
// script calls ray.init() multiple times.
auto job_id2 = JobID::FromInt(2);
auto add_job_request2 =
Mocker::GenAddJobRequest(job_id2, "namespace_100", submission_id);
std::promise<bool> promise4;
gcs_job_manager.HandleAddJob(
*add_job_request2,
&empty_reply,
[&promise4](Status, std::function<void()>, std::function<void()>) {
promise4.set_value(true);
});
promise4.get_future().get();

// Get all job info again.
rpc::GetAllJobInfoRequest all_job_info_request4;
rpc::GetAllJobInfoReply all_job_info_reply4;
std::promise<bool> all_job_info_promise4;

gcs_job_manager.HandleGetAllJobInfo(
all_job_info_request4,
&all_job_info_reply4,
[&all_job_info_promise4](Status, std::function<void()>, std::function<void()>) {
all_job_info_promise4.set_value(true);
});
all_job_info_promise4.get_future().get();

ASSERT_EQ(all_job_info_reply4.job_info_list().size(), 101);
}

TEST_F(GcsJobManagerTest, TestGetAllJobInfoWithLimit) {
Expand Down

0 comments on commit 08b2fa2

Please sign in to comment.