1515#include " ray/core_worker/store_provider/plasma_store_provider.h"
1616
1717#include < algorithm>
18- #include < cstdint>
1918#include < memory>
2019#include < string>
2120#include < utility>
@@ -178,20 +177,23 @@ Status CoreWorkerPlasmaStoreProvider::Release(const ObjectID &object_id) {
178177 return store_client_->Release (object_id);
179178}
180179
181- Status CoreWorkerPlasmaStoreProvider::GetObjectsFromPlasmaStore (
180+ Status CoreWorkerPlasmaStoreProvider::PullObjectsAndGetFromPlasmaStore (
182181 absl::flat_hash_set<ObjectID> &remaining,
183- const std::vector<ObjectID> &ids ,
182+ const std::vector<ObjectID> &batch_ids ,
184183 int64_t timeout_ms,
185184 absl::flat_hash_map<ObjectID, std::shared_ptr<RayObject>> *results,
186185 bool *got_exception) {
186+ const auto owner_addresses = reference_counter_.GetOwnerAddresses (batch_ids);
187+ RAY_RETURN_NOT_OK (raylet_ipc_client_->AsyncGetObjects (batch_ids, owner_addresses));
188+
187189 std::vector<plasma::ObjectBuffer> plasma_results;
188- RAY_RETURN_NOT_OK (store_client_->Get (ids , timeout_ms, &plasma_results));
190+ RAY_RETURN_NOT_OK (store_client_->Get (batch_ids , timeout_ms, &plasma_results));
189191
190192 // Add successfully retrieved objects to the result map and remove them from
191193 // the set of IDs to get.
192194 for (size_t i = 0 ; i < plasma_results.size (); i++) {
193195 if (plasma_results[i].data != nullptr || plasma_results[i].metadata != nullptr ) {
194- const auto &object_id = ids [i];
196+ const auto &object_id = batch_ids [i];
195197 std::shared_ptr<TrackedBuffer> data = nullptr ;
196198 std::shared_ptr<Buffer> metadata = nullptr ;
197199 if (plasma_results[i].data && plasma_results[i].data ->Size () > 0 ) {
@@ -214,6 +216,7 @@ Status CoreWorkerPlasmaStoreProvider::GetObjectsFromPlasmaStore(
214216 (*results)[object_id] = std::move (result_object);
215217 }
216218 }
219+
217220 return Status::OK ();
218221}
219222
@@ -251,52 +254,57 @@ Status CoreWorkerPlasmaStoreProvider::GetExperimentalMutableObject(
251254 return store_client_->GetExperimentalMutableObject (object_id, mutable_object);
252255}
253256
257+ Status UnblockIfNeeded (
258+ const std::shared_ptr<ipc::RayletIpcClientInterface> &raylet_client,
259+ const WorkerContext &ctx) {
260+ if (ctx.CurrentTaskIsDirectCall ()) {
261+ // NOTE: for direct call actors, we still need to issue an unblock IPC to release
262+ // get subscriptions, even if the worker isn't blocked.
263+ if (ctx.ShouldReleaseResourcesOnBlockingCalls () || ctx.CurrentActorIsDirectCall ()) {
264+ return raylet_client->NotifyWorkerUnblocked ();
265+ } else {
266+ return Status::OK (); // We don't need to release resources.
267+ }
268+ } else {
269+ return raylet_client->CancelGetRequest ();
270+ }
271+ }
272+
254273Status CoreWorkerPlasmaStoreProvider::Get (
255274 const absl::flat_hash_set<ObjectID> &object_ids,
256275 int64_t timeout_ms,
257- absl::flat_hash_map<ObjectID, std::shared_ptr<RayObject>> *results) {
258- std::vector<ipc::ScopedResponse> get_request_cleanup_handlers;
259-
260- bool got_exception = false ;
261- absl::flat_hash_set<ObjectID> remaining (object_ids.begin (), object_ids.end ());
262- std::vector<ObjectID> id_vector (object_ids.begin (), object_ids.end ());
276+ const WorkerContext &ctx,
277+ absl::flat_hash_map<ObjectID, std::shared_ptr<RayObject>> *results,
278+ bool *got_exception) {
263279 std::vector<ObjectID> batch_ids;
280+ absl::flat_hash_set<ObjectID> remaining (object_ids.begin (), object_ids.end ());
264281
265- int64_t num_total_objects = static_cast <int64_t >(object_ids.size ());
266-
267- // TODO(57923): Need to understand if batching is necessary. If it's necessary,
268- // then the reason needs to be documented.
269- for (int64_t start = 0 ; start < num_total_objects; start += fetch_batch_size_) {
282+ // Send initial requests to pull all objects in parallel.
283+ std::vector<ObjectID> id_vector (object_ids.begin (), object_ids.end ());
284+ int64_t total_size = static_cast <int64_t >(object_ids.size ());
285+ for (int64_t start = 0 ; start < total_size; start += fetch_batch_size_) {
270286 batch_ids.clear ();
271- for (int64_t i = start; i < start + fetch_batch_size_ && i < num_total_objects ; i++) {
287+ for (int64_t i = start; i < start + fetch_batch_size_ && i < total_size ; i++) {
272288 batch_ids.push_back (id_vector[i]);
273289 }
274-
275- // 1. Make the request to pull all objects into local plasma if not local already.
276- std::vector<rpc::Address> owner_addresses =
277- reference_counter_.GetOwnerAddresses (batch_ids);
278- StatusOr<ipc::ScopedResponse> status_or_cleanup =
279- raylet_ipc_client_->AsyncGetObjects (batch_ids, owner_addresses);
280- RAY_RETURN_NOT_OK (status_or_cleanup.status ());
281- get_request_cleanup_handlers.emplace_back (std::move (status_or_cleanup.value ()));
282-
283- // 2. Try to Get all objects that are already local from the plasma store.
284290 RAY_RETURN_NOT_OK (
285- GetObjectsFromPlasmaStore (remaining,
286- batch_ids,
287- /* timeout_ms=*/ 0 ,
288- // Mutable objects must be local before ray.get.
289- results,
290- & got_exception));
291+ PullObjectsAndGetFromPlasmaStore (remaining,
292+ batch_ids,
293+ /* timeout_ms=*/ 0 ,
294+ // Mutable objects must be local before ray.get.
295+ results,
296+ got_exception));
291297 }
292298
293- if (remaining.empty () || got_exception) {
294- return Status::OK ();
299+ // If all objects were fetched already, return. Note that we always need to
300+ // call UnblockIfNeeded() to cancel the get request.
301+ if (remaining.empty () || *got_exception) {
302+ return UnblockIfNeeded (raylet_ipc_client_, ctx);
295303 }
296304
297- // 3. If not all objects were successfully fetched, repeatedly call
298- // GetObjectsFromPlasmaStore in batches. This loop will run indefinitely until the
299- // objects are all fetched if timeout is -1.
305+ // If not all objects were successfully fetched, repeatedly call FetchOrReconstruct
306+ // and Get from the local object store in batches. This loop will run indefinitely
307+ // until the objects are all fetched if timeout is -1.
300308 bool should_break = false ;
301309 bool timed_out = false ;
302310 int64_t remaining_timeout = timeout_ms;
@@ -320,16 +328,18 @@ Status CoreWorkerPlasmaStoreProvider::Get(
320328 }
321329
322330 size_t previous_size = remaining.size ();
323- RAY_RETURN_NOT_OK (GetObjectsFromPlasmaStore (
324- remaining, batch_ids, batch_timeout, results, & got_exception));
325- should_break = timed_out || got_exception;
331+ RAY_RETURN_NOT_OK (PullObjectsAndGetFromPlasmaStore (
332+ remaining, batch_ids, batch_timeout, results, got_exception));
333+ should_break = timed_out || * got_exception;
326334
327335 if ((previous_size - remaining.size ()) < batch_ids.size ()) {
328336 WarnIfFetchHanging (fetch_start_time_ms, remaining);
329337 }
330338 if (check_signals_) {
331339 Status status = check_signals_ ();
332340 if (!status.ok ()) {
341+ // TODO(edoakes): in this case which status should we return?
342+ RAY_RETURN_NOT_OK (UnblockIfNeeded (raylet_ipc_client_, ctx));
333343 return status;
334344 }
335345 }
@@ -344,14 +354,13 @@ Status CoreWorkerPlasmaStoreProvider::Get(
344354 }
345355
346356 if (!remaining.empty () && timed_out) {
347- return Status::TimedOut (absl::StrFormat (
348- " Could not fetch %d objects within the timeout of %dms. %d objects were not "
349- " ready." ,
350- object_ids.size (),
351- timeout_ms,
352- remaining.size ()));
357+ RAY_RETURN_NOT_OK (UnblockIfNeeded (raylet_ipc_client_, ctx));
358+ return Status::TimedOut (" Get timed out: some object(s) not ready." );
353359 }
354- return Status::OK ();
360+
361+ // Notify unblocked because we blocked when calling FetchOrReconstruct with
362+ // fetch_only=false.
363+ return UnblockIfNeeded (raylet_ipc_client_, ctx);
355364}
356365
357366Status CoreWorkerPlasmaStoreProvider::Contains (const ObjectID &object_id,
0 commit comments