@@ -533,11 +533,10 @@ size_t TaskManager::NumPendingTasks() const {
533533 return num_pending_tasks_;
534534}
535535
536- Status TaskManager::HandleTaskReturn (const ObjectID &object_id,
537- const rpc::ReturnObject &return_object,
538- const NodeID &worker_node_id,
539- bool store_in_plasma,
540- bool *direct_return_out) {
536+ StatusOr<bool > TaskManager::HandleTaskReturn (const ObjectID &object_id,
537+ const rpc::ReturnObject &return_object,
538+ const NodeID &worker_node_id,
539+ bool store_in_plasma) {
541540 bool direct_return = false ;
542541 reference_counter_.UpdateObjectSize (object_id, return_object.size ());
543542 RAY_LOG (DEBUG) << " Task return object " << object_id << " has size "
@@ -581,6 +580,11 @@ Status TaskManager::HandleTaskReturn(const ObjectID &object_id,
581580 tensor_transport.value_or (rpc::TensorTransport::OBJECT_STORE));
582581 if (store_in_plasma) {
583582 Status s = put_in_local_plasma_callback_ (object, object_id);
583+ int retry_count = 0 ;
584+ while (!s.ok () && s.IsTransientObjectStoreFull () && retry_count < 3 ) {
585+ retry_count++;
586+ s = put_in_local_plasma_callback_ (object, object_id);
587+ }
584588 if (!s.ok ()) {
585589 return s;
586590 }
@@ -599,10 +603,7 @@ Status TaskManager::HandleTaskReturn(const ObjectID &object_id,
599603 }
600604 reference_counter_.AddNestedObjectIds (object_id, nested_ids, owner_address);
601605 }
602- if (direct_return_out != nullptr ) {
603- *direct_return_out = direct_return;
604- }
605- return Status::OK ();
606+ return direct_return;
606607}
607608
608609bool TaskManager::TryDelObjectRefStream (const ObjectID &generator_id) {
@@ -820,13 +821,15 @@ bool TaskManager::HandleReportGeneratorItemReturns(
820821 }
821822 // When an object is reported, the object is ready to be fetched.
822823 reference_counter_.UpdateObjectPendingCreation (object_id, false );
823- bool _direct_unused = false ;
824- RAY_UNUSED (
824+ StatusOr<bool > put_res =
825825 HandleTaskReturn (object_id,
826826 return_object,
827827 NodeID::FromBinary (request.worker_addr ().node_id ()),
828- /* store_in_plasma=*/ store_in_plasma_ids.contains (object_id),
829- &_direct_unused));
828+ /* store_in_plasma=*/ store_in_plasma_ids.contains (object_id));
829+ if (!put_res.ok ()) {
830+ RAY_LOG (WARNING).WithField (object_id)
831+ << " Failed to handle streaming dynamic return: " << put_res.status ();
832+ }
830833 }
831834
832835 // Handle backpressure if needed.
@@ -910,41 +913,49 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
910913 reference_counter_.AddDynamicReturn (object_id, generator_id);
911914 dynamic_return_ids.push_back (object_id);
912915 }
913- bool direct = false ;
914- Status s = HandleTaskReturn (object_id,
915- return_object,
916- NodeID::FromBinary (worker_addr.node_id ()),
917- store_in_plasma_ids.contains (object_id),
918- &direct);
919- if (!s.ok ()) {
916+ StatusOr<bool > direct_or =
917+ HandleTaskReturn (object_id,
918+ return_object,
919+ NodeID::FromBinary (worker_addr.node_id ()),
920+ store_in_plasma_ids.contains (object_id));
921+ if (!direct_or.ok ()) {
920922 RAY_LOG (WARNING).WithField (object_id)
921- << " Failed to handle dynamic task return: " << s;
922- } else if (!direct && first_execution) {
923+ << " Failed to handle dynamic task return: " << direct_or.status ();
924+ // Treat as system failure for this attempt and fail immediately to avoid hangs.
925+ Status st = direct_or.status ();
926+ FailOrRetryPendingTask (task_id,
927+ rpc::ErrorType::WORKER_DIED,
928+ &st,
929+ /* ray_error_info=*/ nullptr ,
930+ /* mark_task_object_failed=*/ true ,
931+ /* fail_immediately=*/ true );
932+ return ;
933+ } else if (!direct_or.value () && first_execution) {
923934 dynamic_returns_in_plasma.push_back (object_id);
924935 }
925936 }
926937 }
927938
928939 for (const auto &return_object : reply.return_objects ()) {
929940 const auto object_id = ObjectID::FromBinary (return_object.object_id ());
930- bool direct = false ;
931- Status s = HandleTaskReturn (object_id,
932- return_object,
933- NodeID::FromBinary (worker_addr.node_id ()),
934- store_in_plasma_ids.contains (object_id),
935- &direct);
936- if (!s.ok ()) {
937- RAY_LOG (WARNING).WithField (object_id) << " Failed to handle task return: " << s;
941+ StatusOr<bool > direct_or = HandleTaskReturn (object_id,
942+ return_object,
943+ NodeID::FromBinary (worker_addr.node_id ()),
944+ store_in_plasma_ids.contains (object_id));
945+ if (!direct_or.ok ()) {
946+ RAY_LOG (WARNING).WithField (object_id)
947+ << " Failed to handle task return: " << direct_or.status ();
938948 // If storing return in plasma failed, treat as system failure for this attempt.
939949 // Do not proceed with normal completion. Mark task failed immediately.
950+ Status st = direct_or.status ();
940951 FailOrRetryPendingTask (task_id,
941952 rpc::ErrorType::WORKER_DIED,
942- &s ,
953+ &st ,
943954 /* ray_error_info=*/ nullptr ,
944955 /* mark_task_object_failed=*/ true ,
945956 /* fail_immediately=*/ true );
946957 return ;
947- } else if (direct ) {
958+ } else if (direct_or. value () ) {
948959 direct_return_ids.push_back (object_id);
949960 }
950961 }
@@ -1068,12 +1079,16 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
10681079 const auto generator_return_id = spec.StreamingGeneratorReturnId (i);
10691080 RAY_CHECK_EQ (reply.return_objects_size (), 1 );
10701081 const auto &return_object = reply.return_objects (0 );
1071- bool _direct_unused = false ;
1072- RAY_UNUSED (HandleTaskReturn (generator_return_id,
1073- return_object,
1074- NodeID::FromBinary (worker_addr.node_id ()),
1075- store_in_plasma_ids.contains (generator_return_id),
1076- &_direct_unused));
1082+ StatusOr<bool > res =
1083+ HandleTaskReturn (generator_return_id,
1084+ return_object,
1085+ NodeID::FromBinary (worker_addr.node_id ()),
1086+ store_in_plasma_ids.contains (generator_return_id));
1087+ if (!res.ok ()) {
1088+ RAY_LOG (WARNING).WithField (generator_return_id)
1089+ << " Failed to handle generator return during app error propagation: "
1090+ << res.status ();
1091+ }
10771092 }
10781093 }
10791094 }
@@ -1484,26 +1499,26 @@ void TaskManager::MarkTaskReturnObjectsFailed(
14841499 int64_t num_returns = spec.NumReturns ();
14851500 for (int i = 0 ; i < num_returns; i++) {
14861501 const auto object_id = ObjectID::FromIndex (task_id, /* index=*/ i + 1 );
1502+ // Always place an error marker in local memory to unblock waiters quickly.
1503+ in_memory_store_.Put (error, object_id);
1504+ // Best-effort plasma put if the object was meant to be in plasma.
14871505 if (store_in_plasma_ids.contains (object_id)) {
14881506 Status s = put_in_local_plasma_callback_ (error, object_id);
14891507 if (!s.ok ()) {
14901508 RAY_LOG (WARNING).WithField (object_id)
14911509 << " Failed to put error object in plasma: " << s;
14921510 }
1493- } else {
1494- in_memory_store_.Put (error, object_id);
14951511 }
14961512 }
14971513 if (spec.ReturnsDynamic ()) {
14981514 for (const auto &dynamic_return_id : spec.DynamicReturnIds ()) {
1515+ in_memory_store_.Put (error, dynamic_return_id);
14991516 if (store_in_plasma_ids.contains (dynamic_return_id)) {
15001517 Status s = put_in_local_plasma_callback_ (error, dynamic_return_id);
15011518 if (!s.ok ()) {
15021519 RAY_LOG (WARNING).WithField (dynamic_return_id)
15031520 << " Failed to put error object in plasma: " << s;
15041521 }
1505- } else {
1506- in_memory_store_.Put (error, dynamic_return_id);
15071522 }
15081523 }
15091524 }
0 commit comments