Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/mock/ray/gcs/store_client/in_memory_store_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class MockInMemoryStoreClient : public InMemoryStoreClient {
AsyncGet,
(const std::string &table_name,
const std::string &key,
ToPostable<OptionalItemCallback<std::string>> callback),
ToPostable<rpc::OptionalItemCallback<std::string>> callback),
(override));

MOCK_METHOD(void,
Expand Down
2 changes: 1 addition & 1 deletion src/mock/ray/gcs/store_client/redis_store_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class MockStoreClient : public StoreClient {
AsyncGet,
(const std::string &table_name,
const std::string &key,
ToPostable<OptionalItemCallback<std::string>> callback),
ToPostable<rpc::OptionalItemCallback<std::string>> callback),
(override));
MOCK_METHOD(void,
AsyncGetAll,
Expand Down
2 changes: 1 addition & 1 deletion src/mock/ray/gcs/store_client/store_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class MockStoreClient : public StoreClient {
AsyncGet,
(const std::string &table_name,
const std::string &key,
ToPostable<OptionalItemCallback<std::string>> callback),
ToPostable<rpc::OptionalItemCallback<std::string>> callback),
(override));
MOCK_METHOD(void,
AsyncGetAll,
Expand Down
64 changes: 34 additions & 30 deletions src/mock/ray/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,30 @@ class MockJobInfoAccessor : public JobInfoAccessor {
MOCK_METHOD(void,
AsyncAdd,
(const std::shared_ptr<rpc::JobTableData> &data_ptr,
const StatusCallback &callback),
const rpc::StatusCallback &callback),
(override));
MOCK_METHOD(void,
AsyncMarkFinished,
(const JobID &job_id, const StatusCallback &callback),
(const JobID &job_id, const rpc::StatusCallback &callback),
(override));
MOCK_METHOD(void,
AsyncSubscribeAll,
((const SubscribeCallback<JobID, rpc::JobTableData> &subscribe),
const StatusCallback &done),
((const rpc::SubscribeCallback<JobID, rpc::JobTableData> &subscribe),
const rpc::StatusCallback &done),
(override));
MOCK_METHOD(void,
AsyncGetAll,
(const std::optional<std::string> &job_or_submission_id,
bool skip_submission_job_info_field,
bool skip_is_running_tasks_field,
const MultiItemCallback<rpc::JobTableData> &callback,
const rpc::MultiItemCallback<rpc::JobTableData> &callback,
int64_t timeout_ms),
(override));
MOCK_METHOD(void, AsyncResubscribe, (), (override));
MOCK_METHOD(void, AsyncGetNextJobID, (const ItemCallback<JobID> &callback), (override));
MOCK_METHOD(void,
AsyncGetNextJobID,
(const rpc::ItemCallback<JobID> &callback),
(override));
};

} // namespace gcs
Expand All @@ -56,35 +59,35 @@ class MockNodeInfoAccessor : public NodeInfoAccessor {
public:
MOCK_METHOD(void,
RegisterSelf,
(rpc::GcsNodeInfo && local_node_info, const StatusCallback &callback),
(rpc::GcsNodeInfo && local_node_info, const rpc::StatusCallback &callback),
(override));
MOCK_METHOD(void,
AsyncRegister,
(const rpc::GcsNodeInfo &node_info, const StatusCallback &callback),
(const rpc::GcsNodeInfo &node_info, const rpc::StatusCallback &callback),
(override));
MOCK_METHOD(void,
AsyncCheckAlive,
(const std::vector<NodeID> &node_ids,
int64_t timeout_ms,
const MultiItemCallback<bool> &callback),
const rpc::MultiItemCallback<bool> &callback),
(override));
MOCK_METHOD(void,
AsyncGetAll,
(const MultiItemCallback<rpc::GcsNodeInfo> &callback,
(const rpc::MultiItemCallback<rpc::GcsNodeInfo> &callback,
int64_t timeout_ms,
const std::vector<NodeID> &node_ids),
(override));
MOCK_METHOD(void,
AsyncGetAllNodeAddressAndLiveness,
(const MultiItemCallback<rpc::GcsNodeAddressAndLiveness> &callback,
(const rpc::MultiItemCallback<rpc::GcsNodeAddressAndLiveness> &callback,
int64_t timeout_ms,
const std::vector<NodeID> &node_ids),
(override));
MOCK_METHOD(
void,
AsyncSubscribeToNodeAddressAndLivenessChange,
(std::function<void(NodeID, const rpc::GcsNodeAddressAndLiveness &)> subscribe,
StatusCallback done),
rpc::StatusCallback done),
(override));
MOCK_METHOD(std::optional<rpc::GcsNodeAddressAndLiveness>,
GetNodeAddressAndLiveness,
Expand Down Expand Up @@ -115,11 +118,11 @@ class MockNodeResourceInfoAccessor : public NodeResourceInfoAccessor {
public:
MOCK_METHOD(void,
AsyncGetAllAvailableResources,
(const MultiItemCallback<rpc::AvailableResources> &callback),
(const rpc::MultiItemCallback<rpc::AvailableResources> &callback),
(override));
MOCK_METHOD(void,
AsyncGetAllResourceUsage,
(const ItemCallback<rpc::ResourceUsageBatchData> &callback),
(const rpc::ItemCallback<rpc::ResourceUsageBatchData> &callback),
(override));
};

Expand All @@ -144,7 +147,8 @@ class MockTaskInfoAccessor : public TaskInfoAccessor {
public:
MOCK_METHOD(void,
AsyncAddTaskEventData,
(std::unique_ptr<rpc::TaskEventData> data_ptr, StatusCallback callback),
(std::unique_ptr<rpc::TaskEventData> data_ptr,
rpc::StatusCallback callback),
(override));
};

Expand All @@ -158,27 +162,27 @@ class MockWorkerInfoAccessor : public WorkerInfoAccessor {
public:
MOCK_METHOD(void,
AsyncSubscribeToWorkerFailures,
(const ItemCallback<rpc::WorkerDeltaData> &subscribe,
const StatusCallback &done),
(const rpc::ItemCallback<rpc::WorkerDeltaData> &subscribe,
const rpc::StatusCallback &done),
(override));
MOCK_METHOD(void,
AsyncReportWorkerFailure,
(const std::shared_ptr<rpc::WorkerTableData> &data_ptr,
const StatusCallback &callback),
const rpc::StatusCallback &callback),
(override));
MOCK_METHOD(void,
AsyncGet,
(const WorkerID &worker_id,
const OptionalItemCallback<rpc::WorkerTableData> &callback),
const rpc::OptionalItemCallback<rpc::WorkerTableData> &callback),
(override));
MOCK_METHOD(void,
AsyncGetAll,
(const MultiItemCallback<rpc::WorkerTableData> &callback),
(const rpc::MultiItemCallback<rpc::WorkerTableData> &callback),
(override));
MOCK_METHOD(void,
AsyncAdd,
(const std::shared_ptr<rpc::WorkerTableData> &data_ptr,
const StatusCallback &callback),
const rpc::StatusCallback &callback),
(override));
MOCK_METHOD(void, AsyncResubscribe, (), (override));
};
Expand All @@ -198,18 +202,18 @@ class MockPlacementGroupInfoAccessor : public PlacementGroupInfoAccessor {
MOCK_METHOD(void,
AsyncGet,
(const PlacementGroupID &placement_group_id,
const OptionalItemCallback<rpc::PlacementGroupTableData> &callback),
const rpc::OptionalItemCallback<rpc::PlacementGroupTableData> &callback),
(override));
MOCK_METHOD(void,
AsyncGetByName,
(const std::string &placement_group_name,
const std::string &ray_namespace,
const OptionalItemCallback<rpc::PlacementGroupTableData> &callback,
const rpc::OptionalItemCallback<rpc::PlacementGroupTableData> &callback,
int64_t timeout_ms),
(override));
MOCK_METHOD(void,
AsyncGetAll,
(const MultiItemCallback<rpc::PlacementGroupTableData> &callback),
(const rpc::MultiItemCallback<rpc::PlacementGroupTableData> &callback),
(override));
MOCK_METHOD(Status,
SyncRemovePlacementGroup,
Expand All @@ -234,14 +238,14 @@ class MockInternalKVAccessor : public InternalKVAccessor {
(const std::string &ns,
const std::string &prefix,
const int64_t timeout_ms,
const OptionalItemCallback<std::vector<std::string>> &callback),
const rpc::OptionalItemCallback<std::vector<std::string>> &callback),
(override));
MOCK_METHOD(void,
AsyncInternalKVGet,
(const std::string &ns,
const std::string &key,
const int64_t timeout_ms,
const OptionalItemCallback<std::string> &callback),
const rpc::OptionalItemCallback<std::string> &callback),
(override));
MOCK_METHOD(void,
AsyncInternalKVPut,
Expand All @@ -250,26 +254,26 @@ class MockInternalKVAccessor : public InternalKVAccessor {
const std::string &value,
bool overwrite,
const int64_t timeout_ms,
const OptionalItemCallback<bool> &callback),
const rpc::OptionalItemCallback<bool> &callback),
(override));
MOCK_METHOD(void,
AsyncInternalKVExists,
(const std::string &ns,
const std::string &key,
const int64_t timeout_ms,
const OptionalItemCallback<bool> &callback),
const rpc::OptionalItemCallback<bool> &callback),
(override));
MOCK_METHOD(void,
AsyncInternalKVDel,
(const std::string &ns,
const std::string &key,
bool del_by_prefix,
const int64_t timeout_ms,
const OptionalItemCallback<int> &callback),
const rpc::OptionalItemCallback<int> &callback),
(override));
MOCK_METHOD(void,
AsyncGetInternalConfig,
(const OptionalItemCallback<std::string> &callback),
(const rpc::OptionalItemCallback<std::string> &callback),
(override));
};

Expand Down
24 changes: 12 additions & 12 deletions src/mock/ray/gcs_client/accessors/actor_info_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ class FakeActorInfoAccessor : public gcs::ActorInfoAccessorInterface {

// Stub implementations for interface methods not used by this test
void AsyncGet(const ActorID &,
const gcs::OptionalItemCallback<rpc::ActorTableData> &) override {}
const rpc::OptionalItemCallback<rpc::ActorTableData> &) override {}
void AsyncGetAllByFilter(const std::optional<ActorID> &,
const std::optional<JobID> &,
const std::optional<std::string> &,
const gcs::MultiItemCallback<rpc::ActorTableData> &,
const rpc::MultiItemCallback<rpc::ActorTableData> &,
int64_t = -1) override {}
void AsyncGetByName(const std::string &,
const std::string &,
const gcs::OptionalItemCallback<rpc::ActorTableData> &,
const rpc::OptionalItemCallback<rpc::ActorTableData> &,
int64_t = -1) override {}
Status SyncGetByName(const std::string &,
const std::string &,
Expand All @@ -56,20 +56,20 @@ class FakeActorInfoAccessor : public gcs::ActorInfoAccessorInterface {
}
void AsyncReportActorOutOfScope(const ActorID &,
uint64_t,
const gcs::StatusCallback &,
const rpc::StatusCallback &,
int64_t = -1) override {}
void AsyncRegisterActor(const TaskSpecification &task_spec,
const gcs::StatusCallback &callback,
const rpc::StatusCallback &callback,
int64_t = -1) override {
async_register_actor_callback_ = callback;
}
void AsyncRestartActorForLineageReconstruction(const ActorID &,
uint64_t,
const gcs::StatusCallback &,
const rpc::StatusCallback &,
int64_t = -1) override {}
Status SyncRegisterActor(const TaskSpecification &) override { return Status::OK(); }
void AsyncKillActor(
const ActorID &, bool, bool, const gcs::StatusCallback &, int64_t = -1) override {}
const ActorID &, bool, bool, const rpc::StatusCallback &, int64_t = -1) override {}
void AsyncCreateActor(
const TaskSpecification &task_spec,
const rpc::ClientCallback<rpc::CreateActorReply> &callback) override {
Expand All @@ -78,8 +78,8 @@ class FakeActorInfoAccessor : public gcs::ActorInfoAccessorInterface {

void AsyncSubscribe(
const ActorID &actor_id,
const gcs::SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
const gcs::StatusCallback &done) override {
const rpc::SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
const rpc::StatusCallback &done) override {
auto callback_entry = std::make_pair(actor_id, subscribe);
callback_map_.emplace(actor_id, subscribe);
subscribe_finished_callback_map_[actor_id] = done;
Expand Down Expand Up @@ -124,14 +124,14 @@ class FakeActorInfoAccessor : public gcs::ActorInfoAccessorInterface {
return true;
}

absl::flat_hash_map<ActorID, gcs::SubscribeCallback<ActorID, rpc::ActorTableData>>
absl::flat_hash_map<ActorID, rpc::SubscribeCallback<ActorID, rpc::ActorTableData>>
callback_map_;
absl::flat_hash_map<ActorID, gcs::StatusCallback> subscribe_finished_callback_map_;
absl::flat_hash_map<ActorID, rpc::StatusCallback> subscribe_finished_callback_map_;
absl::flat_hash_map<ActorID, uint32_t> actor_subscribed_times_;

// Callbacks for AsyncCreateActor and AsyncRegisterActor
rpc::ClientCallback<rpc::CreateActorReply> async_create_actor_callback_;
gcs::StatusCallback async_register_actor_callback_;
rpc::StatusCallback async_register_actor_callback_;
};

} // namespace gcs
Expand Down
8 changes: 0 additions & 8 deletions src/ray/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -390,14 +390,6 @@ ray_cc_library(
],
)

ray_cc_library(
name = "gcs_callback_types",
hdrs = ["gcs_callback_types.h"],
deps = [
"//src/ray/common:status",
],
)

ray_cc_library(
name = "metrics",
hdrs = ["metrics.h"],
Expand Down
55 changes: 0 additions & 55 deletions src/ray/common/gcs_callback_types.h

This file was deleted.

Loading