Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Test] Revert "[Core] Join Ray Jobs API JobInfo with GCS JobTableData (#… #32283

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 1 addition & 24 deletions dashboard/modules/job/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,7 @@ def is_terminal(self) -> bool:
@PublicAPI(stability="stable")
@dataclass
class JobInfo:
"""A class for recording information associated with a job and its execution.

Please keep this in sync with the JobsAPIInfo proto in src/ray/protobuf/gcs.proto.
"""
"""A class for recording information associated with a job and its execution."""

#: The status of the job.
status: JobStatus
Expand Down Expand Up @@ -130,9 +127,6 @@ def __post_init__(self):
def to_json(self) -> Dict[str, Any]:
"""Convert this object to a JSON-serializable dictionary.

Note that the runtime_env field is converted to a JSON-serialized string
and the field is renamed to runtime_env_json.

Returns:
A JSON-serializable dictionary representing the JobInfo object.
"""
Expand All @@ -142,12 +136,6 @@ def to_json(self) -> Dict[str, Any]:
# Convert enum values to strings.
json_dict["status"] = str(json_dict["status"])

# Convert runtime_env to a JSON-serialized string.
if "runtime_env" in json_dict:
if json_dict["runtime_env"] is not None:
json_dict["runtime_env_json"] = json.dumps(json_dict["runtime_env"])
del json_dict["runtime_env"]

# Assert that the dictionary is JSON-serializable.
json.dumps(json_dict)

Expand All @@ -157,21 +145,12 @@ def to_json(self) -> Dict[str, Any]:
def from_json(cls, json_dict: Dict[str, Any]) -> None:
"""Initialize this object from a JSON dictionary.

Note that the runtime_env_json field is converted to a dictionary and
the field is renamed to runtime_env.

Args:
json_dict: A JSON dictionary to use to initialize the JobInfo object.
"""
# Convert enum values to enum objects.
json_dict["status"] = JobStatus(json_dict["status"])

# Convert runtime_env from a JSON-serialized string to a dictionary.
if "runtime_env_json" in json_dict:
if json_dict["runtime_env_json"] is not None:
json_dict["runtime_env"] = json.loads(json_dict["runtime_env_json"])
del json_dict["runtime_env_json"]

return cls(**json_dict)


Expand All @@ -180,8 +159,6 @@ class JobInfoStorageClient:
Interface to put and get job data from the Internal KV store.
"""

# Please keep this format in sync with JobDataKey()
# in src/ray/gcs/gcs_server/gcs_job_manager.h.
JOB_DATA_KEY_PREFIX = f"{ray_constants.RAY_INTERNAL_NAMESPACE_PREFIX}job_info_"
JOB_DATA_KEY = f"{JOB_DATA_KEY_PREFIX}{{job_id}}"

Expand Down
58 changes: 1 addition & 57 deletions dashboard/modules/job/tests/test_common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import pytest
import json

from ray.dashboard.modules.job.common import (
JobInfo,
Expand All @@ -10,9 +9,6 @@
JobSubmitRequest,
)

from ray.core.generated.gcs_pb2 import JobsAPIInfo
from google.protobuf.json_format import Parse


class TestJobSubmitRequestValidation:
def test_validate_entrypoint(self):
Expand Down Expand Up @@ -155,7 +151,7 @@ def test_job_info_to_json():
"entrypoint_num_cpus": 1,
"entrypoint_num_gpus": 1,
"entrypoint_resources": {"Custom": 1},
"runtime_env_json": '{"pip": ["pkg"]}',
"runtime_env": {"pip": ["pkg"]},
}

# Check that the expected items are in the JSON.
Expand All @@ -169,58 +165,6 @@ def test_job_info_to_json():
assert isinstance(new_job_info.status, JobStatus)


def test_job_info_json_to_proto():
"""Test that JobInfo JSON can be converted to JobsAPIInfo protobuf."""
info = JobInfo(
status=JobStatus.PENDING,
entrypoint="echo hi",
error_type="error_type",
start_time=123,
end_time=456,
metadata={"hi": "hi2"},
entrypoint_num_cpus=1,
entrypoint_num_gpus=1,
entrypoint_resources={"Custom": 1},
runtime_env={"pip": ["pkg"]},
driver_agent_http_address="http://localhost:1234",
driver_node_id="node_id",
)
info_json = json.dumps(info.to_json())
info_proto = Parse(info_json, JobsAPIInfo())
assert info_proto.status == "PENDING"
assert info_proto.entrypoint == "echo hi"
assert info_proto.start_time == 123
assert info_proto.end_time == 456
assert info_proto.metadata == {"hi": "hi2"}
assert info_proto.entrypoint_num_cpus == 1
assert info_proto.entrypoint_num_gpus == 1
assert info_proto.entrypoint_resources == {"Custom": 1}
assert info_proto.runtime_env_json == '{"pip": ["pkg"]}'
assert info_proto.message == (
"Job has not started yet. It may be waiting for resources "
"(CPUs, GPUs, custom resources) to become available. "
"It may be waiting for the runtime environment to be set up."
)
assert info_proto.error_type == "error_type"
assert info_proto.driver_agent_http_address == "http://localhost:1234"
assert info_proto.driver_node_id == "node_id"

minimal_info = JobInfo(status=JobStatus.PENDING, entrypoint="echo hi")
minimal_info_json = json.dumps(minimal_info.to_json())
minimal_info_proto = Parse(minimal_info_json, JobsAPIInfo())
assert minimal_info_proto.status == "PENDING"
assert minimal_info_proto.entrypoint == "echo hi"
for unset_optional_field in [
"entrypoint_num_cpus",
"entrypoint_num_gpus",
"runtime_env_json",
"error_type",
"driver_agent_http_address",
"driver_node_id",
]:
assert not minimal_info_proto.HasField(unset_optional_field)


if __name__ == "__main__":
import sys

Expand Down
2 changes: 0 additions & 2 deletions python/ray/_private/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,6 @@ def env_bool(key, default):
# Prefix for namespaces which are used internally by ray.
# Jobs within these namespaces should be hidden from users
# and should not be considered user activity.
# Please keep this in sync with the definition kRayInternalNamespacePrefix
# in /src/ray/gcs/gcs_server/gcs_job_manager.h.
RAY_INTERNAL_NAMESPACE_PREFIX = "_ray_internal_"


Expand Down
2 changes: 1 addition & 1 deletion python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1912,7 +1912,7 @@ def connect(
it during startup as a command line argument.
ray_debugger_external: If True, make the debugger external to the
node this worker is running on.
entrypoint: The name of the entrypoint script. Ignored if the
entrypoint: The name of the entrypoint script. Ignored unless the
mode != SCRIPT_MODE
"""
# Do some basic checking to make sure we didn't call ray.init twice.
Expand Down
74 changes: 0 additions & 74 deletions src/mock/ray/gcs/gcs_server/gcs_kv_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,79 +61,5 @@ class MockInternalKVInterface : public ray::gcs::InternalKVInterface {
(override));
};

// Fake internal KV interface that simply stores keys and values in a C++ map.
// Only supports Put and Get.
// Warning: Naively prepends the namespace to the key, so e.g.
// the (namespace, key) pairs ("a", "bc") and ("ab", "c") will collide which is a bug.

class FakeInternalKVInterface : public ray::gcs::InternalKVInterface {
public:
FakeInternalKVInterface() {}

// The C++ map.
std::unordered_map<std::string, std::string> kv_store_ = {};

void Get(const std::string &ns,
const std::string &key,
std::function<void(std::optional<std::string>)> callback) override {
std::string full_key = ns + key;
auto it = kv_store_.find(full_key);
if (it == kv_store_.end()) {
callback(std::nullopt);
} else {
callback(it->second);
}
}

void MultiGet(const std::string &ns,
const std::vector<std::string> &keys,
std::function<void(std::unordered_map<std::string, std::string>)>
callback) override {
std::unordered_map<std::string, std::string> result;
for (const auto &key : keys) {
std::string full_key = ns + key;
auto it = kv_store_.find(full_key);
if (it != kv_store_.end()) {
result[key] = it->second;
}
}
callback(result);
}

void Put(const std::string &ns,
const std::string &key,
const std::string &value,
bool overwrite,
std::function<void(bool)> callback) override {
std::string full_key = ns + key;
if (kv_store_.find(full_key) != kv_store_.end() && !overwrite) {
callback(false);
} else {
kv_store_[full_key] = value;
callback(true);
}
}

MOCK_METHOD(void,
Del,
(const std::string &ns,
const std::string &key,
bool del_by_prefix,
std::function<void(int64_t)> callback),
(override));
MOCK_METHOD(void,
Exists,
(const std::string &ns,
const std::string &key,
std::function<void(bool)> callback),
(override));
MOCK_METHOD(void,
Keys,
(const std::string &ns,
const std::string &prefix,
std::function<void(std::vector<std::string>)> callback),
(override));
};

} // namespace gcs
} // namespace ray
20 changes: 0 additions & 20 deletions src/ray/gcs/gcs_client/test/global_state_accessor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,26 +125,6 @@ TEST_P(GlobalStateAccessorTest, TestJobTable) {
ASSERT_EQ(global_state_->GetAllJobInfo().size(), job_count);
}

// Test GetAllJobInfo where some jobs were submitted by the Ray Job API (i.d. they have
// job_submission_id set).
TEST_P(GlobalStateAccessorTest, TestJobTableWithSubmissionId) {
int job_count = 100;
ASSERT_EQ(global_state_->GetAllJobInfo().size(), 0);
for (int index = 0; index < job_count; ++index) {
auto job_id = JobID::FromInt(index);
auto job_table_data = Mocker::GenJobTableData(job_id);
if (index % 2 == 0) {
(*job_table_data->mutable_config()->mutable_metadata())["job_submission_id"] =
std::to_string(index);
}
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Jobs().AsyncAdd(
job_table_data, [&promise](Status status) { promise.set_value(status.ok()); }));
promise.get_future().get();
}
ASSERT_EQ(global_state_->GetAllJobInfo().size(), job_count);
}

TEST_P(GlobalStateAccessorTest, TestNodeTable) {
int node_count = 100;
ASSERT_EQ(global_state_->GetAllNodeInfo().size(), 0);
Expand Down
70 changes: 3 additions & 67 deletions src/ray/gcs/gcs_server/gcs_job_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#include "ray/gcs/gcs_server/gcs_job_manager.h"

#include "ray/gcs/gcs_client/accessor.h"
#include "ray/gcs/pb_util.h"

namespace ray {
Expand Down Expand Up @@ -147,76 +146,13 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request,
rpc::GetAllJobInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) {
RAY_LOG(INFO) << "Getting all job info.";

int limit = std::numeric_limits<int>::max();
if (request.has_limit()) {
limit = request.limit();
if (limit < 0) {
RAY_LOG(ERROR) << "Invalid limit " << limit
<< " specified in GetAllJobInfoRequest, "
<< "must be nonnegative.";
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::Invalid("Invalid limit"));
return;
}
RAY_LOG(INFO) << "Getting job info with limit " << limit << ".";
}

auto on_done = [this, reply, send_reply_callback, limit](
auto on_done = [reply, send_reply_callback](
const absl::flat_hash_map<JobID, JobTableData> &result) {
// Internal KV keys for jobs that were submitted via the Ray Job API.
std::vector<std::string> job_api_data_keys;

// Maps job data keys to the index of the job in the table.
std::unordered_map<std::string, int> job_data_key_to_index;

// Load the job table data into the reply.
int i = 0;
for (auto &data : result) {
if (i >= limit) {
break;
}
reply->add_job_info_list()->CopyFrom(data.second);
auto &metadata = data.second.config().metadata();
auto iter = metadata.find("job_submission_id");
if (iter != metadata.end()) {
// This job was submitted via the Ray Job API, so it has JobInfo in the kv.
std::string job_submission_id = iter->second;
std::string job_data_key = JobDataKey(job_submission_id);
job_api_data_keys.push_back(job_data_key);
job_data_key_to_index[job_data_key] = i;
}
i++;
}

RAY_CHECK(job_api_data_keys.size() == job_data_key_to_index.size());

auto kv_multi_get_callback =
[reply, send_reply_callback, job_data_key_to_index](
std::unordered_map<std::string, std::string> result) {
for (auto &data : result) {
std::string job_data_key = data.first;
// The JobInfo stored by the Ray Job API.
std::string job_info_json = data.second;
if (!job_info_json.empty()) {
// Parse the JSON into a JobsAPIInfo proto.
rpc::JobsAPIInfo jobs_api_info;
auto status = google::protobuf::util::JsonStringToMessage(job_info_json,
&jobs_api_info);
if (!status.ok()) {
RAY_LOG(ERROR)
<< "Failed to parse JobInfo JSON into JobsAPIInfo protobuf. JSON: "
<< job_info_json << " Error: " << status.message();
}
// Add the JobInfo to the correct index in the reply.
reply->mutable_job_info_list(job_data_key_to_index.at(job_data_key))
->mutable_job_info()
->CopyFrom(std::move(jobs_api_info));
}
}
RAY_LOG(INFO) << "Finished getting all job info.";
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
};
internal_kv_.MultiGet("job", job_api_data_keys, kv_multi_get_callback);
RAY_LOG(INFO) << "Finished getting all job info.";
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
};
Status status = gcs_table_storage_->JobTable().GetAll(on_done);
if (!status.ok()) {
Expand Down
Loading