Skip to content

Commit a6cc549

Browse files
Kunchddavik
andauthored
[Core] Move request id creation to worker to address plasma get perf regression (#58390)
## Description This PR address the performance regression introduced in the [PR to make ray.get thread safe](#57911). Specifically, the previous PR requires the worker to block and wait for AsyncGet to return with a reply of the request id needed for correctly cleaning up get requests. This additional synchronous step causes the plasma store Get to regress in performance. This PR moves the request id generation step to the plasma store, removing the blocking step to fix the perf regression. ## Related issues - [PR which introduced perf regression](#57911) - [PR which observed the regression](#58175) ## Additional information New performance of the change measured by `ray microbenchmark`. <img width="485" height="17" alt="image" src="https://github.com/user-attachments/assets/b96b9676-3735-4e94-9ade-aaeb7514f4d0" /> Original performance prior to the change. Here we focus on the regressing `single client get calls (Plasma Store)` metric, where our new performance returns us back to the original 10k per second range compared to the existing sub 5k per second. <img width="811" height="355" alt="image" src="https://github.com/user-attachments/assets/d1fecf82-708e-48c4-9879-34c59a5e056c" /> --------- Signed-off-by: davik <davik@anyscale.com> Co-authored-by: davik <davik@anyscale.com>
1 parent 9e450e6 commit a6cc549

File tree

12 files changed

+50
-67
lines changed

12 files changed

+50
-67
lines changed

src/ray/core_worker/store_provider/plasma_store_provider.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider(
7575
store_client_(std::move(store_client)),
7676
reference_counter_(reference_counter),
7777
check_signals_(std::move(check_signals)),
78-
fetch_batch_size_(fetch_batch_size) {
78+
fetch_batch_size_(fetch_batch_size),
79+
get_request_counter_(0) {
7980
if (get_current_call_site != nullptr) {
8081
get_current_call_site_ = get_current_call_site;
8182
} else {
@@ -275,8 +276,8 @@ Status CoreWorkerPlasmaStoreProvider::Get(
275276
// 1. Make the request to pull all objects into local plasma if not local already.
276277
std::vector<rpc::Address> owner_addresses =
277278
reference_counter_.GetOwnerAddresses(batch_ids);
278-
StatusOr<ipc::ScopedResponse> status_or_cleanup =
279-
raylet_ipc_client_->AsyncGetObjects(batch_ids, owner_addresses);
279+
StatusOr<ipc::ScopedResponse> status_or_cleanup = raylet_ipc_client_->AsyncGetObjects(
280+
batch_ids, owner_addresses, get_request_counter_.fetch_add(1));
280281
RAY_RETURN_NOT_OK(status_or_cleanup.status());
281282
get_request_cleanup_handlers.emplace_back(std::move(status_or_cleanup.value()));
282283

src/ray/core_worker/store_provider/plasma_store_provider.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,7 @@ class CoreWorkerPlasmaStoreProvider {
261261
// Pointer to the shared buffer tracker.
262262
std::shared_ptr<BufferTracker> buffer_tracker_;
263263
int64_t fetch_batch_size_ = 0;
264+
std::atomic<int64_t> get_request_counter_;
264265
};
265266

266267
} // namespace core

src/ray/flatbuffers/node_manager.fbs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@ enum MessageType:int {
4040
DisconnectClientReply,
4141
// Request the Raylet to pull a set of objects to the local node.
4242
AsyncGetObjectsRequest,
43-
// Reply contains the request id that will be used to clean up the request.
44-
AsyncGetObjectsReply,
4543
// Cleanup a given get request on the raylet.
4644
CancelGetRequest,
4745
// Notify the current worker is blocked for objects to become available. The raylet
@@ -134,10 +132,7 @@ table AsyncGetObjectsRequest {
134132
// Object IDs that we want the Raylet to pull locally.
135133
object_ids: [string];
136134
owner_addresses: [Address];
137-
}
138-
139-
table AsyncGetObjectsReply {
140-
request_id: long;
135+
get_request_id: long;
141136
}
142137

143138
table CancelGetRequest {

src/ray/raylet/lease_dependency_manager.cc

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,10 @@ void LeaseDependencyManager::CancelWaitRequest(const WorkerID &worker_id) {
115115
wait_requests_.erase(req_iter);
116116
}
117117

118-
GetRequestId LeaseDependencyManager::StartGetRequest(
119-
const WorkerID &worker_id, std::vector<rpc::ObjectReference> &&required_objects) {
118+
void LeaseDependencyManager::StartGetRequest(
119+
const WorkerID &worker_id,
120+
std::vector<rpc::ObjectReference> &&required_objects,
121+
int64_t get_request_id) {
120122
std::vector<ObjectID> object_ids;
121123
object_ids.reserve(required_objects.size());
122124

@@ -130,16 +132,12 @@ GetRequestId LeaseDependencyManager::StartGetRequest(
130132
uint64_t new_pull_request_id = object_manager_.Pull(
131133
std::move(required_objects), BundlePriority::GET_REQUEST, {"", false});
132134

133-
const GetRequestId get_request_id = get_request_counter_++;
134-
135135
const std::pair<WorkerID, GetRequestId> worker_and_request_ids =
136136
std::make_pair(worker_id, get_request_id);
137137

138138
get_requests_.emplace(std::move(worker_and_request_ids),
139139
std::make_pair(std::move(object_ids), new_pull_request_id));
140140
worker_to_requests_[worker_id].emplace(get_request_id);
141-
142-
return get_request_id;
143141
}
144142

145143
void LeaseDependencyManager::CancelGetRequest(const WorkerID &worker_id,

src/ray/raylet/lease_dependency_manager.h

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,11 @@ class LeaseDependencyManager : public LeaseDependencyManagerInterface {
135135

136136
/// \param worker_id The ID of the worker that called `ray.get`.
137137
/// \param required_objects The objects required by the worker.
138-
/// \return the request id which will be used for cleanup.
139-
GetRequestId StartGetRequest(const WorkerID &worker_id,
140-
std::vector<rpc::ObjectReference> &&required_objects);
138+
/// \param get_request_id The ID of the get request. It is used by the worker to clean
139+
/// up a GetRequest.
140+
void StartGetRequest(const WorkerID &worker_id,
141+
std::vector<rpc::ObjectReference> &&required_objects,
142+
int64_t get_request_id);
141143

142144
/// Cleans up either an inflight or finished get request. Cancels the underlying
143145
/// pull if necessary.
@@ -302,9 +304,6 @@ class LeaseDependencyManager : public LeaseDependencyManagerInterface {
302304
/// dependencies are all local or not.
303305
absl::flat_hash_map<LeaseID, std::unique_ptr<LeaseDependencies>> queued_lease_requests_;
304306

305-
/// Used to generate monotonically increasing get request ids.
306-
GetRequestId get_request_counter_;
307-
308307
// Maps a GetRequest to the PullRequest Id and the set of ObjectIDs.
309308
// Used to cleanup a finished or cancel an inflight get request.
310309
// TODO(57911): This can be slimmed down. We do not need to track the ObjectIDs.

src/ray/raylet/node_manager.cc

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1624,21 +1624,7 @@ void NodeManager::HandleAsyncGetObjectsRequest(
16241624
auto request = flatbuffers::GetRoot<protocol::AsyncGetObjectsRequest>(message_data);
16251625
std::vector<rpc::ObjectReference> refs =
16261626
FlatbufferToObjectReferences(*request->object_ids(), *request->owner_addresses());
1627-
int64_t request_id = AsyncGet(client, refs);
1628-
flatbuffers::FlatBufferBuilder fbb;
1629-
auto get_reply = protocol::CreateAsyncGetObjectsReply(fbb, request_id);
1630-
fbb.Finish(get_reply);
1631-
Status status = client->WriteMessage(
1632-
static_cast<int64_t>(protocol::MessageType::AsyncGetObjectsReply),
1633-
fbb.GetSize(),
1634-
fbb.GetBufferPointer());
1635-
if (!status.ok()) {
1636-
DisconnectClient(client,
1637-
/*graceful=*/false,
1638-
rpc::WorkerExitType::SYSTEM_ERROR,
1639-
absl::StrFormat("Could not send AsyncGetObjectsReply because of %s",
1640-
status.ToString()));
1641-
}
1627+
AsyncGet(client, refs, request->get_request_id());
16421628
}
16431629

16441630
void NodeManager::ProcessWaitRequestMessage(
@@ -2373,15 +2359,16 @@ void NodeManager::HandleNotifyWorkerUnblocked(
23732359
}
23742360
}
23752361

2376-
int64_t NodeManager::AsyncGet(const std::shared_ptr<ClientConnection> &client,
2377-
std::vector<rpc::ObjectReference> &object_refs) {
2362+
void NodeManager::AsyncGet(const std::shared_ptr<ClientConnection> &client,
2363+
std::vector<rpc::ObjectReference> &object_refs,
2364+
int64_t get_request_id) {
23782365
std::shared_ptr<WorkerInterface> worker = worker_pool_.GetRegisteredWorker(client);
23792366
if (!worker) {
23802367
worker = worker_pool_.GetRegisteredDriver(client);
23812368
}
23822369
RAY_CHECK(worker);
2383-
return lease_dependency_manager_.StartGetRequest(worker->WorkerId(),
2384-
std::move(object_refs));
2370+
lease_dependency_manager_.StartGetRequest(
2371+
worker->WorkerId(), std::move(object_refs), get_request_id);
23852372
}
23862373

23872374
void NodeManager::AsyncWait(const std::shared_ptr<ClientConnection> &client,

src/ray/raylet/node_manager.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -429,9 +429,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
429429
/// \param client The client that is requesting the objects.
430430
/// \param object_refs The objects that are requested.
431431
///
432-
/// \return the request_id that will be used to cancel the get request.
433-
int64_t AsyncGet(const std::shared_ptr<ClientConnection> &client,
434-
std::vector<rpc::ObjectReference> &object_refs);
432+
/// \param get_request_id The ID of the get request. It is used by the worker to clean
433+
/// up a GetRequest.
434+
void AsyncGet(const std::shared_ptr<ClientConnection> &client,
435+
std::vector<rpc::ObjectReference> &object_refs,
436+
int64_t get_request_id);
435437

436438
/// Cancel all ongoing get requests from the client.
437439
///

src/ray/raylet/tests/lease_dependency_manager_test.cc

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -243,15 +243,14 @@ TEST_F(LeaseDependencyManagerTest, TestLeaseArgEviction) {
243243
TEST_F(LeaseDependencyManagerTest, TestCancelingSingleGetRequestForWorker) {
244244
WorkerID worker_id = WorkerID::FromRandom();
245245
int num_requests = 5;
246-
std::vector<GetRequestId> requests;
247-
for (int i = 0; i < num_requests; i++) {
246+
for (int64_t i = 0; i < num_requests; i++) {
248247
ObjectID argument_id = ObjectID::FromRandom();
249-
requests.emplace_back(lease_dependency_manager_.StartGetRequest(
250-
worker_id, ObjectIdsToRefs({argument_id})));
248+
lease_dependency_manager_.StartGetRequest(
249+
worker_id, ObjectIdsToRefs({argument_id}), i);
251250
}
252251
ASSERT_EQ(object_manager_mock_.active_get_requests.size(), num_requests);
253-
for (int i = 0; i < num_requests; i++) {
254-
lease_dependency_manager_.CancelGetRequest(worker_id, requests[i]);
252+
for (int64_t i = 0; i < num_requests; i++) {
253+
lease_dependency_manager_.CancelGetRequest(worker_id, i);
255254
ASSERT_EQ(object_manager_mock_.active_get_requests.size(), num_requests - (i + 1));
256255
}
257256
AssertNoLeaks();
@@ -260,11 +259,10 @@ TEST_F(LeaseDependencyManagerTest, TestCancelingSingleGetRequestForWorker) {
260259
TEST_F(LeaseDependencyManagerTest, TestCancelingAllGetRequestsForWorker) {
261260
WorkerID worker_id = WorkerID::FromRandom();
262261
int num_requests = 5;
263-
std::vector<GetRequestId> requests;
264-
for (int i = 0; i < num_requests; i++) {
262+
for (int64_t i = 0; i < num_requests; i++) {
265263
ObjectID argument_id = ObjectID::FromRandom();
266-
requests.emplace_back(lease_dependency_manager_.StartGetRequest(
267-
worker_id, ObjectIdsToRefs({argument_id})));
264+
lease_dependency_manager_.StartGetRequest(
265+
worker_id, ObjectIdsToRefs({argument_id}), i);
268266
}
269267
ASSERT_EQ(object_manager_mock_.active_get_requests.size(), num_requests);
270268
lease_dependency_manager_.CancelGetRequest(worker_id);

src/ray/raylet_ipc_client/fake_raylet_ipc_client.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ class FakeRayletIpcClient : public RayletIpcClientInterface {
5555

5656
StatusOr<ScopedResponse> AsyncGetObjects(
5757
const std::vector<ObjectID> &object_ids,
58-
const std::vector<rpc::Address> &owner_addresses) override {
58+
const std::vector<rpc::Address> &owner_addresses,
59+
int64_t get_request_id) override {
5960
return ScopedResponse();
6061
}
6162

src/ray/raylet_ipc_client/raylet_ipc_client.cc

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -195,23 +195,22 @@ Status RayletIpcClient::ActorCreationTaskDone() {
195195

196196
StatusOr<ScopedResponse> RayletIpcClient::AsyncGetObjects(
197197
const std::vector<ObjectID> &object_ids,
198-
const std::vector<rpc::Address> &owner_addresses) {
198+
const std::vector<rpc::Address> &owner_addresses,
199+
int64_t get_request_id) {
199200
RAY_CHECK(object_ids.size() == owner_addresses.size());
200201
flatbuffers::FlatBufferBuilder fbb;
201202
auto object_ids_message = flatbuf::to_flatbuf(fbb, object_ids);
202-
auto message = protocol::CreateAsyncGetObjectsRequest(
203-
fbb, object_ids_message, AddressesToFlatbuffer(fbb, owner_addresses));
203+
auto message =
204+
protocol::CreateAsyncGetObjectsRequest(fbb,
205+
object_ids_message,
206+
AddressesToFlatbuffer(fbb, owner_addresses),
207+
get_request_id);
204208
fbb.Finish(message);
205209
std::vector<uint8_t> reply;
206210
// TODO(57923): This should be FATAL. Local sockets are reliable. If a worker is unable
207211
// to communicate with the raylet, there's no way to recover.
208-
RAY_RETURN_NOT_OK(AtomicRequestReply(MessageType::AsyncGetObjectsRequest,
209-
MessageType::AsyncGetObjectsReply,
210-
&reply,
211-
&fbb));
212-
auto reply_message = flatbuffers::GetRoot<protocol::AsyncGetObjectsReply>(reply.data());
213-
int64_t request_id = reply_message->request_id();
214-
return ScopedResponse([this, request_id_to_cleanup = request_id]() {
212+
RAY_RETURN_NOT_OK(WriteMessage(MessageType::AsyncGetObjectsRequest, &fbb));
213+
return ScopedResponse([this, request_id_to_cleanup = get_request_id]() {
215214
return CancelGetRequest(request_id_to_cleanup);
216215
});
217216
}

0 commit comments

Comments
 (0)