-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support inlining task arguments #72
Changes from all commits
30d2abe
1dd4e56
57cb3fa
f36b06a
5185e71
c8e7875
f572822
1f40349
feb71cf
a0852b0
a48db48
9e27f4c
35b7e4c
68b5372
2344b1f
026b152
b5738cb
2727fd3
48b1b65
7623be0
64c71cb
327dd2b
cf8da68
8eec7a3
c66bea3
399c917
987e22a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -140,7 +140,7 @@ std::vector<std::shared_ptr<RayObject>> cast_to_task_args(void *ptr) { | |
} | ||
|
||
ObjectID _submit_task(const ray::JuliaFunctionDescriptor &jl_func_descriptor, | ||
const std::vector<ObjectID> &object_ids, | ||
const std::vector<TaskArg *> &task_args, | ||
const std::string &serialized_runtime_env_info, | ||
const std::unordered_map<std::string, double> &resources) { | ||
|
||
|
@@ -149,9 +149,11 @@ ObjectID _submit_task(const ray::JuliaFunctionDescriptor &jl_func_descriptor, | |
ray::FunctionDescriptor func_descriptor = std::make_shared<ray::JuliaFunctionDescriptor>(jl_func_descriptor); | ||
RayFunction func(Language::JULIA, func_descriptor); | ||
|
||
// TODO: Passing in a `std::vector<std::unique_ptr<TaskArg>>` from Julia may currently be impossible due to: | ||
// https://github.com/JuliaInterop/CxxWrap.jl/issues/370 | ||
std::vector<std::unique_ptr<TaskArg>> args; | ||
for (auto & obj_id : object_ids) { | ||
args.emplace_back(new TaskArgByReference(obj_id, worker.GetRpcAddress(), /*call-site*/"")); | ||
for (auto &task_arg : task_args) { | ||
args.emplace_back(task_arg); | ||
Comment on lines
+155
to
+156
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since the element type is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah wait I always forget, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Just to avoid any possible confusion the |
||
} | ||
|
||
// TaskOptions: https://github.com/ray-project/ray/blob/ray-2.5.1/src/ray/core_worker/common.h#L62-L87 | ||
|
@@ -355,16 +357,24 @@ std::unordered_map<std::string, double> get_task_required_resources() { | |
return worker_context.GetCurrentTask()->GetRequiredResources().GetResourceUnorderedMap(); | ||
} | ||
|
||
void _push_back(std::vector<TaskArg *> &vector, TaskArg *el) { | ||
vector.push_back(el); | ||
} | ||
|
||
namespace jlcxx | ||
{ | ||
// Needed for upcasting | ||
template<> struct SuperType<LocalMemoryBuffer> { typedef Buffer type; }; | ||
template<> struct SuperType<JuliaFunctionDescriptor> { typedef FunctionDescriptorInterface type; }; | ||
template<> struct SuperType<TaskArgByReference> { typedef TaskArg type; }; | ||
template<> struct SuperType<TaskArgByValue> { typedef TaskArg type; }; | ||
|
||
// Disable generated constructors | ||
// https://github.com/JuliaInterop/CxxWrap.jl/issues/141#issuecomment-491373720 | ||
template<> struct DefaultConstructible<LocalMemoryBuffer> : std::false_type {}; | ||
template<> struct DefaultConstructible<RayObject> : std::false_type {}; | ||
// template<> struct DefaultConstructible<JuliaFunctionDescriptor> : std::false_type {}; | ||
template<> struct DefaultConstructible<TaskArg> : std::false_type {}; | ||
|
||
// Custom finalizer to show what is being deleted. Can be useful in tracking down | ||
// segmentation faults due to double deallocations | ||
|
@@ -420,12 +430,6 @@ JLCXX_MODULE define_julia_module(jlcxx::Module& mod) | |
.method("Binary", &TaskID::Binary) | ||
.method("Hex", &TaskID::Hex); | ||
|
||
// https://github.com/ray-project/ray/blob/ray-2.5.1/src/ray/core_worker/core_worker.h#L284 | ||
mod.add_type<ray::core::CoreWorker>("CoreWorker") | ||
.method("GetCurrentJobId", &ray::core::CoreWorker::GetCurrentJobId) | ||
.method("GetCurrentTaskId", &ray::core::CoreWorker::GetCurrentTaskId); | ||
mod.method("_GetCoreWorker", &_GetCoreWorker); | ||
|
||
mod.method("initialize_driver", &initialize_driver); | ||
mod.method("shutdown_driver", &shutdown_driver); | ||
mod.method("initialize_worker", &initialize_worker); | ||
|
@@ -497,15 +501,51 @@ JLCXX_MODULE define_julia_module(jlcxx::Module& mod) | |
|
||
mod.method("put", &put); | ||
mod.method("get", &get); | ||
mod.method("_submit_task", &_submit_task); | ||
|
||
// message Address | ||
// https://github.com/ray-project/ray/blob/ray-2.5.1/src/ray/protobuf/common.proto#L86 | ||
mod.add_type<rpc::Address>("Address") | ||
.constructor<>() | ||
.method("SerializeToString", [](const rpc::Address &addr) { | ||
std::string serialized; | ||
addr.SerializeToString(&serialized); | ||
return serialized; | ||
}) | ||
.method("MessageToJsonString", [](const rpc::Address &addr) { | ||
std::string json; | ||
google::protobuf::util::MessageToJsonString(addr, &json); | ||
return json; | ||
}); | ||
|
||
// https://github.com/ray-project/ray/blob/ray-2.5.1/src/ray/core_worker/core_worker.h#L284 | ||
mod.add_type<ray::core::CoreWorker>("CoreWorker") | ||
.method("GetCurrentJobId", &ray::core::CoreWorker::GetCurrentJobId) | ||
.method("GetCurrentTaskId", &ray::core::CoreWorker::GetCurrentTaskId) | ||
.method("GetRpcAddress", &ray::core::CoreWorker::GetRpcAddress); | ||
mod.method("_GetCoreWorker", &_GetCoreWorker); | ||
|
||
// message ObjectReference | ||
// https://github.com/ray-project/ray/blob/ray-2.5.1/src/ray/protobuf/common.proto#L500 | ||
mod.add_type<rpc::ObjectReference>("ObjectReference"); | ||
jlcxx::stl::apply_stl<rpc::ObjectReference>(mod); | ||
|
||
// class RayObject | ||
// https://github.com/ray-project/ray/blob/ray-2.5.1/src/ray/common/ray_object.h#L28 | ||
mod.add_type<RayObject>("RayObject") | ||
.method("GetData", &RayObject::GetData); | ||
|
||
// Julia RayObject constructors make shared_ptrs | ||
mod.method("RayObject", [] ( | ||
const std::shared_ptr<Buffer> &data, | ||
const std::shared_ptr<Buffer> &metadata, | ||
const std::vector<rpc::ObjectReference> &nested_refs, | ||
bool copy_data = false) { | ||
|
||
return std::make_shared<RayObject>(data, metadata, nested_refs, copy_data); | ||
}); | ||
mod.method("RayObject", [] (const std::shared_ptr<Buffer> &data) { | ||
return std::make_shared<RayObject>(data, nullptr, std::vector<rpc::ObjectReference>(), false); | ||
}); | ||
jlcxx::stl::apply_stl<std::shared_ptr<RayObject>>(mod); | ||
|
||
mod.add_type<Status>("Status") | ||
|
@@ -536,4 +576,50 @@ JLCXX_MODULE define_julia_module(jlcxx::Module& mod) | |
mod.method("serialize_job_config_json", &serialize_job_config_json); | ||
mod.method("get_job_serialized_runtime_env", &get_job_serialized_runtime_env); | ||
mod.method("get_task_required_resources", &get_task_required_resources); | ||
|
||
// class RayConfig | ||
// https://github.com/ray-project/ray/blob/ray-2.5.1/src/ray/common/ray_config.h#L60 | ||
// | ||
// Lambdas required here as otherwise we see the following error: | ||
// "error: call to non-static member function without an object argument" | ||
mod.add_type<RayConfig>("RayConfig") | ||
.method("RayConfigInstance", &RayConfig::instance) | ||
.method("max_direct_call_object_size", [](RayConfig &config) { | ||
return config.max_direct_call_object_size(); | ||
}) | ||
.method("task_rpc_inlined_bytes_limit", [](RayConfig &config) { | ||
return config.task_rpc_inlined_bytes_limit(); | ||
}) | ||
.method("record_ref_creation_sites", [](RayConfig &config) { | ||
return config.record_ref_creation_sites(); | ||
}); | ||
Comment on lines
+587
to
+595
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do these need to be lambdas? seem pretty straightforward... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure now. I'll try this without the lambdas There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fails with this:
|
||
|
||
// https://github.com/ray-project/ray/blob/ray-2.5.1/src/ray/common/task/task_util.h | ||
mod.add_type<TaskArg>("TaskArg"); | ||
mod.method("_push_back", &_push_back); | ||
|
||
// The Julia types `TaskArgByReference` and `TaskArgByValue` have their default finalizers | ||
// disabled as these will later be used as `std::unique_ptr`. If these finalizers were enabled | ||
// we would see segmentation faults due to double deallocations. | ||
// | ||
// Note: It is possible to create `std::unique_ptr`s in C++ and return them to Julia however | ||
// CxxWrap is unable to compile any wrapped functions using `std::vector<std::unique_ptr<TaskArg>>`. | ||
// We're working around this by using `std::vector<TaskArg *>`. | ||
// https://github.com/JuliaInterop/CxxWrap.jl/issues/370 | ||
|
||
mod.add_type<TaskArgByReference>("TaskArgByReference", jlcxx::julia_base_type<TaskArg>()) | ||
.constructor<const ObjectID &/*object_id*/, | ||
const rpc::Address &/*owner_address*/, | ||
const std::string &/*call_site*/>(false) | ||
.method("unique_ptr", [](TaskArgByReference *t) { | ||
return std::unique_ptr<TaskArgByReference>(t); | ||
}); | ||
|
||
mod.add_type<TaskArgByValue>("TaskArgByValue", jlcxx::julia_base_type<TaskArg>()) | ||
.constructor<const std::shared_ptr<RayObject> &/*value*/>(false) | ||
.method("unique_ptr", [](TaskArgByValue *t) { | ||
return std::unique_ptr<TaskArgByValue>(t); | ||
}); | ||
|
||
mod.method("_submit_task", &_submit_task); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we be doing this though? the values will be automatically dereferenced on the the task execution side so I'm not so sure; then again, if we don't do it then users may be surprised if they do have to de-reference in their work functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Python code I'm referring to is attempting to avoid creating a new
ObjectRef
here if the user has passed in one as an argument. In all cases Python will dereference the arguments inside the called task. What I was specifically calling out here is that due toflatten_args
this optimization will never occur as anarg
is never aObjectRef
. At best it would be alist
containing anObjectRef
as the second value.