Skip to content

Commit

Permalink
[1/N] Streaming Generator. Cpp interfaces and implementation (ray-pro…
Browse files Browse the repository at this point in the history
…ject#35291)

This PR introduces TaskManager interfaces to enable streaming generator.
  • Loading branch information
rkooo567 authored May 18, 2023
1 parent dedc47d commit bfec451
Show file tree
Hide file tree
Showing 21 changed files with 1,199 additions and 75 deletions.
5 changes: 2 additions & 3 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -599,8 +599,7 @@ cdef store_task_errors(
CTaskType task_type,
proctitle,
c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *returns,
c_string* application_error,
):
c_string* application_error):
cdef:
CoreWorker core_worker = worker.core_worker

Expand Down Expand Up @@ -655,6 +654,7 @@ cdef store_task_errors(
raise RayActorError.from_task_error(failure_object)
return num_errors_stored


cdef execute_dynamic_generator_and_store_task_outputs(
generator,
const CObjectID &generator_id,
Expand Down Expand Up @@ -995,7 +995,6 @@ cdef void execute_task(

# Store the outputs in the object store.
with core_worker.profile_event(b"task:store_outputs"):
num_returns = returns[0].size()
if dynamic_returns != NULL:
if not inspect.isgenerator(outputs):
raise ValueError(
Expand Down
9 changes: 7 additions & 2 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
c_bool PinExistingReturnObject(
const CObjectID& return_id,
shared_ptr[CRayObject] *return_object,
const CObjectID& generator_id
)
const CObjectID& generator_id)
CObjectID AllocateDynamicReturnId()

CJobID GetCurrentJobId()
Expand Down Expand Up @@ -237,6 +236,12 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
int64_t timeout_ms,
c_vector[shared_ptr[CObjectLocation]] *results)
CRayStatus TriggerGlobalGC()
CRayStatus ReportGeneratorItemReturns(
const pair[CObjectID, shared_ptr[CRayObject]] &dynamic_return_object,
const CObjectID &generator_id,
const CAddress &caller_address,
int64_t item_index,
c_bool finished)
c_string MemoryUsageString()

CWorkerContext &GetWorkerContext()
Expand Down
1 change: 1 addition & 0 deletions python/ray/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ def _remote(self, args=None, kwargs=None, **task_options):
num_returns = task_options["num_returns"]
if num_returns == "dynamic":
num_returns = -1

max_retries = task_options["max_retries"]
retry_exceptions = task_options["retry_exceptions"]
if isinstance(retry_exceptions, (list, tuple)):
Expand Down
8 changes: 5 additions & 3 deletions python/ray/tests/test_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def read(gen):
gen = ray.get(
remote_generator_fn.options(num_returns="dynamic").remote(0, store_in_plasma)
)
assert len(gen) == 0
assert len(list(gen)) == 0

# Check that passing as task arg.
gen = remote_generator_fn.options(num_returns="dynamic").remote(10, store_in_plasma)
Expand All @@ -284,7 +284,9 @@ def static(num_returns):
return list(range(num_returns))

with pytest.raises(ray.exceptions.RayTaskError):
ray.get(static.remote(3))
gen = ray.get(static.remote(3))
for ref in gen:
ray.get(ref)


def test_dynamic_generator_distributed(ray_start_cluster):
Expand Down Expand Up @@ -535,7 +537,7 @@ def maybe_empty_generator(exec_counter):

@ray.remote
def check(empty_generator):
return len(empty_generator) == 0
return len(list(empty_generator)) == 0

exec_counter = ExecutionCounter.remote()
gen = maybe_empty_generator.remote(exec_counter)
Expand Down
8 changes: 7 additions & 1 deletion src/ray/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ enum class StatusCode : char {
OutOfDisk = 28,
ObjectUnknownOwner = 29,
RpcError = 30,
OutOfResource = 31
OutOfResource = 31,
ObjectRefStreamEoF = 32
};

#if defined(__clang__)
Expand Down Expand Up @@ -146,6 +147,10 @@ class RAY_EXPORT Status {
return Status(StatusCode::KeyError, msg);
}

static Status ObjectRefStreamEoF(const std::string &msg) {
return Status(StatusCode::ObjectRefStreamEoF, msg);
}

static Status TypeError(const std::string &msg) {
return Status(StatusCode::TypeError, msg);
}
Expand Down Expand Up @@ -254,6 +259,7 @@ class RAY_EXPORT Status {
bool IsOutOfMemory() const { return code() == StatusCode::OutOfMemory; }
bool IsOutOfDisk() const { return code() == StatusCode::OutOfDisk; }
bool IsKeyError() const { return code() == StatusCode::KeyError; }
bool IsObjectRefStreamEoF() const { return code() == StatusCode::ObjectRefStreamEoF; }
bool IsInvalid() const { return code() == StatusCode::Invalid; }
bool IsIOError() const { return code() == StatusCode::IOError; }
bool IsTypeError() const { return code() == StatusCode::TypeError; }
Expand Down
30 changes: 30 additions & 0 deletions src/ray/core_worker/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,35 @@ std::string GenerateCachedActorName(const std::string &ns,
return ns + "-" + actor_name;
}

void SerializeReturnObject(const ObjectID &object_id,
const std::shared_ptr<RayObject> &return_object,
rpc::ReturnObject *return_object_proto) {
return_object_proto->set_object_id(object_id.Binary());

if (!return_object) {
// This should only happen if the local raylet died. Caller should
// retry the task.
RAY_LOG(WARNING) << "Failed to create task return object " << object_id
<< " in the object store, exiting.";
QuickExit();
}
return_object_proto->set_size(return_object->GetSize());
if (return_object->GetData() != nullptr && return_object->GetData()->IsPlasmaBuffer()) {
return_object_proto->set_in_plasma(true);
} else {
if (return_object->GetData() != nullptr) {
return_object_proto->set_data(return_object->GetData()->Data(),
return_object->GetData()->Size());
}
if (return_object->GetMetadata() != nullptr) {
return_object_proto->set_metadata(return_object->GetMetadata()->Data(),
return_object->GetMetadata()->Size());
}
}
for (const auto &nested_ref : return_object->GetNestedRefs()) {
return_object_proto->add_nested_inlined_refs()->CopyFrom(nested_ref);
}
}

} // namespace core
} // namespace ray
5 changes: 5 additions & 0 deletions src/ray/core_worker/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "ray/common/task/task_spec.h"
#include "ray/raylet_client/raylet_client.h"
#include "ray/util/util.h"
#include "src/ray/protobuf/common.pb.h"

namespace ray {
namespace core {
Expand All @@ -37,6 +38,10 @@ std::string LanguageString(Language language);
// `namespace-[job_id-]actor_name`
std::string GenerateCachedActorName(const std::string &ns, const std::string &actor_name);

void SerializeReturnObject(const ObjectID &object_id,
const std::shared_ptr<RayObject> &return_object,
rpc::ReturnObject *return_object_proto);

/// Information about a remote function.
class RayFunction {
public:
Expand Down
56 changes: 56 additions & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2613,6 +2613,9 @@ Status CoreWorker::ExecuteTask(
dynamic_return_objects = NULL;
} else if (task_spec.AttemptNumber() > 0) {
for (const auto &dynamic_return_id : task_spec.DynamicReturnIds()) {
// Increase the put index so that when the generator creates a new obj
// the object id won't conflict.
worker_context_.GetNextPutIndex();
dynamic_return_objects->push_back(
std::make_pair<>(dynamic_return_id, std::shared_ptr<RayObject>()));
RAY_LOG(DEBUG) << "Re-executed task " << task_spec.TaskId()
Expand Down Expand Up @@ -2829,6 +2832,59 @@ ObjectID CoreWorker::AllocateDynamicReturnId() {
return return_id;
}

Status CoreWorker::ReportGeneratorItemReturns(
const std::pair<ObjectID, std::shared_ptr<RayObject>> &dynamic_return_object,
const ObjectID &generator_id,
const rpc::Address &caller_address,
int64_t item_index,
bool finished) {
RAY_LOG(DEBUG) << "Write the object ref stream, index: " << item_index
<< " finished: " << finished << ", id: " << dynamic_return_object.first;
rpc::ReportGeneratorItemReturnsRequest request;
request.mutable_worker_addr()->CopyFrom(rpc_address_);
request.set_item_index(item_index);
request.set_finished(finished);
request.set_generator_id(generator_id.Binary());
auto client = core_worker_client_pool_->GetOrConnect(caller_address);

if (!dynamic_return_object.first.IsNil()) {
RAY_CHECK_EQ(finished, false);
auto return_object_proto = request.add_dynamic_return_objects();
SerializeReturnObject(
dynamic_return_object.first, dynamic_return_object.second, return_object_proto);
std::vector<ObjectID> deleted;
// When we allocate a dynamic return ID (AllocateDynamicReturnId),
// we borrow the object. When the object value is allocatd, the
// memory store is updated. We should clear borrowers and memory store
// here.
ReferenceCounter::ReferenceTableProto borrowed_refs;
reference_counter_->PopAndClearLocalBorrowers(
{dynamic_return_object.first}, &borrowed_refs, &deleted);
memory_store_->Delete(deleted);
} else {
// fininshed must be set when dynamic_return_object is nil.
RAY_CHECK_EQ(finished, true);
}

client->ReportGeneratorItemReturns(
request,
[](const Status &status, const rpc::ReportGeneratorItemReturnsReply &reply) {
if (!status.ok()) {
// TODO(sang): Handle network error more gracefully.
RAY_LOG(ERROR) << "Failed to send the object ref.";
}
});
return Status::OK();
}

void CoreWorker::HandleReportGeneratorItemReturns(
rpc::ReportGeneratorItemReturnsRequest request,
rpc::ReportGeneratorItemReturnsReply *reply,
rpc::SendReplyCallback send_reply_callback) {
task_manager_->HandleReportGeneratorItemReturns(request);
send_reply_callback(Status::OK(), nullptr, nullptr);
}

std::vector<rpc::ObjectReference> CoreWorker::ExecuteTaskLocalMode(
const TaskSpecification &task_spec, const ActorID &actor_id) {
auto resource_ids = std::make_shared<ResourceMappingType>();
Expand Down
44 changes: 44 additions & 0 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
std::vector<ObjectID> deleted;
reference_counter_->RemoveLocalReference(object_id, &deleted);
// TOOD(ilr): better way of keeping an object from being deleted
// TODO(sang): This seems bad... We should delete the memory store
// properly from reference counter.
if (!options_.is_local_mode) {
memory_store_->Delete(deleted);
}
Expand Down Expand Up @@ -704,6 +706,48 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// Trigger garbage collection on each worker in the cluster.
void TriggerGlobalGC();

/// Report the task caller at caller_address that the intermediate
/// task return. It means if this API is used, the caller will be notified
/// the task return before the current task is terminated. The caller must
/// implement HandleReportGeneratorItemReturns API endpoint
/// to handle the intermediate result report.
/// This API makes sense only for a generator task
/// (task that can return multiple intermediate
/// result before the task terminates).
///
/// NOTE: The API doesn't guarantee the ordering of the report. The
/// caller is supposed to reorder the report based on the item_index.
///
/// \param[in] dynamic_return_object A intermediate ray object to report
/// to the caller before the task terminates. This object must have been
/// created dynamically from this worker via AllocateReturnObject.
/// If the Object ID is nil, it means it is the end of the task return.
/// In this case, the caller is responsible for setting finished = true,
/// otherwise it will panic.
/// \param[in] generator_id The return object ref ID from a current generator
/// task.
/// \param[in] caller_address The address of the caller of the current task
/// that created a generator_id.
/// \param[in] item_index The index of the task return. It is used to reorder the
/// report from the caller side.
/// \param[in] finished True indicates there's going to be no more intermediate
/// task return. When finished is provided dynamic_return_object's key must be
/// pair<nil, empty_pointer>
Status ReportGeneratorItemReturns(
const std::pair<ObjectID, std::shared_ptr<RayObject>> &dynamic_return_object,
const ObjectID &generator_id,
const rpc::Address &caller_address,
int64_t item_index,
bool finished);

/// Implements gRPC server handler.
/// If an executor can generator task return before the task is finished,
/// it invokes this endpoint via ReportGeneratorItemReturns RPC.
void HandleReportGeneratorItemReturns(
rpc::ReportGeneratorItemReturnsRequest request,
rpc::ReportGeneratorItemReturnsReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Get a string describing object store memory usage for debugging purposes.
///
/// \return std::string The string describing memory usage.
Expand Down
49 changes: 44 additions & 5 deletions src/ray/core_worker/reference_count.cc
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,40 @@ void ReferenceCounter::AddDynamicReturn(const ObjectID &object_id,
AddNestedObjectIdsInternal(generator_id, {object_id}, owner_address);
}

void ReferenceCounter::OwnDynamicStreamingTaskReturnRef(const ObjectID &object_id,
const ObjectID &generator_id) {
absl::MutexLock lock(&mutex_);
// NOTE: The upper layer (the layer that manges the object ref stream)
// should make sure the generator ref is not GC'ed until the
// stream is deleted.
auto outer_it = object_id_refs_.find(generator_id);
if (outer_it == object_id_refs_.end()) {
// Generator object already went out of scope.
// It means the generator is already GC'ed. No need to
// update the reference.
RAY_LOG(DEBUG)
<< "Ignore OwnDynamicStreamingTaskReturnRef. The dynamic return reference "
<< object_id << " is registered after the generator id " << generator_id
<< " went out of scope.";
return;
}
RAY_LOG(DEBUG) << "Adding dynamic return " << object_id
<< " contained in generator object " << generator_id;
RAY_CHECK(outer_it->second.owned_by_us);
RAY_CHECK(outer_it->second.owner_address.has_value());
rpc::Address owner_address(outer_it->second.owner_address.value());
// We add a local reference here. The ref removal will be handled
// by the ObjectRefStream.
RAY_UNUSED(AddOwnedObjectInternal(object_id,
{},
owner_address,
outer_it->second.call_site,
/*object_size=*/-1,
outer_it->second.is_reconstructable,
/*add_local_ref=*/true,
absl::optional<NodeID>()));
}

bool ReferenceCounter::AddOwnedObjectInternal(
const ObjectID &object_id,
const std::vector<ObjectID> &inner_ids,
Expand Down Expand Up @@ -382,7 +416,7 @@ void ReferenceCounter::UpdateSubmittedTaskReferences(
std::vector<ObjectID> *deleted) {
absl::MutexLock lock(&mutex_);
for (const auto &return_id : return_ids) {
UpdateObjectPendingCreation(return_id, true);
UpdateObjectPendingCreationInternal(return_id, true);
}
for (const ObjectID &argument_id : argument_ids_to_add) {
RAY_LOG(DEBUG) << "Increment ref count for submitted task argument " << argument_id;
Expand Down Expand Up @@ -411,7 +445,7 @@ void ReferenceCounter::UpdateResubmittedTaskReferences(
const std::vector<ObjectID> return_ids, const std::vector<ObjectID> &argument_ids) {
absl::MutexLock lock(&mutex_);
for (const auto &return_id : return_ids) {
UpdateObjectPendingCreation(return_id, true);
UpdateObjectPendingCreationInternal(return_id, true);
}
for (const ObjectID &argument_id : argument_ids) {
auto it = object_id_refs_.find(argument_id);
Expand All @@ -433,7 +467,7 @@ void ReferenceCounter::UpdateFinishedTaskReferences(
std::vector<ObjectID> *deleted) {
absl::MutexLock lock(&mutex_);
for (const auto &return_id : return_ids) {
UpdateObjectPendingCreation(return_id, false);
UpdateObjectPendingCreationInternal(return_id, false);
}
// Must merge the borrower refs before decrementing any ref counts. This is
// to make sure that for serialized IDs, we increment the borrower count for
Expand Down Expand Up @@ -1278,8 +1312,8 @@ void ReferenceCounter::RemoveObjectLocationInternal(ReferenceTable::iterator it,
PushToLocationSubscribers(it);
}

void ReferenceCounter::UpdateObjectPendingCreation(const ObjectID &object_id,
bool pending_creation) {
void ReferenceCounter::UpdateObjectPendingCreationInternal(const ObjectID &object_id,
bool pending_creation) {
auto it = object_id_refs_.find(object_id);
bool push = false;
if (it != object_id_refs_.end()) {
Expand Down Expand Up @@ -1439,6 +1473,11 @@ bool ReferenceCounter::IsObjectReconstructable(const ObjectID &object_id,
return it->second.is_reconstructable;
}

void ReferenceCounter::UpdateObjectReady(const ObjectID &object_id) {
absl::MutexLock lock(&mutex_);
UpdateObjectPendingCreationInternal(object_id, /*pending_creation*/ false);
}

bool ReferenceCounter::IsObjectPendingCreation(const ObjectID &object_id) const {
absl::MutexLock lock(&mutex_);
auto it = object_id_refs_.find(object_id);
Expand Down
Loading

0 comments on commit bfec451

Please sign in to comment.