diff --git a/ray_julia_jll/deps/wrapper.cc b/ray_julia_jll/deps/wrapper.cc index 8c2a638b..c120976b 100644 --- a/ray_julia_jll/deps/wrapper.cc +++ b/ray_julia_jll/deps/wrapper.cc @@ -104,7 +104,7 @@ void initialize_worker( bool is_reattempt, bool is_streaming_generator) { - std::vector> return_vec; + std::vector> return_vec; task_executor(ray_function, &return_vec, // implicity converts to void * &args, // implicity converts to void * @@ -116,8 +116,7 @@ void initialize_worker( // TODO: support multiple return values // https://github.com/beacon-biosignals/Ray.jl/issues/54 - std::shared_ptr buffer = return_vec[0]; - (*returns)[0].second = std::make_shared(buffer, nullptr, std::vector(), false); + (*returns)[0].second = return_vec[0]; return Status::OK(); }; RAY_LOG(DEBUG) << "ray_julia_jll: Initializing julia worker coreworker"; @@ -129,9 +128,9 @@ void initialize_worker( RAY_LOG(DEBUG) << "ray_julia_jll: Task execution loop exited"; } -std::vector> * cast_to_returns(void *ptr) { - auto buffer_ptr = static_cast> *>(ptr); - return buffer_ptr; +std::vector> *cast_to_returns(void *ptr) { + auto rayobj_ptr = static_cast> *>(ptr); + return rayobj_ptr; } std::vector> cast_to_task_args(void *ptr) { diff --git a/ray_julia_jll/src/wrappers/any.jl b/ray_julia_jll/src/wrappers/any.jl index 3a22ad9b..278ac9bd 100644 --- a/ray_julia_jll/src/wrappers/any.jl +++ b/ray_julia_jll/src/wrappers/any.jl @@ -171,7 +171,7 @@ function Base.take!(buffer::CxxWrap.CxxWrapCore.SmartPointer{<:Buffer}) end # Work around this: https://github.com/JuliaInterop/CxxWrap.jl/issues/300 -function Base.push!(v::CxxPtr{StdVector{T}}, el::T) where T <: SharedPtr{LocalMemoryBuffer} +function Base.push!(v::CxxPtr{StdVector{T}}, el::T) where T <: SharedPtr{RayObject} return push!(v, CxxRef(el)) end diff --git a/src/runtime.jl b/src/runtime.jl index 8db2efde..7b11ed04 100644 --- a/src/runtime.jl +++ b/src/runtime.jl @@ -269,6 +269,7 @@ function task_executor(ray_function, returns_ptr, task_args_ptr, task_name, application_error, is_retryable_error) returns = ray_jll.cast_to_returns(returns_ptr) task_args = ray_jll.cast_to_task_args(task_args_ptr) + worker = ray_jll.GetCoreWorker() local result try @@ -321,9 +322,17 @@ function task_executor(ray_function, returns_ptr, task_args_ptr, task_name, # TODO: support multiple return values # https://github.com/beacon-biosignals/Ray.jl/issues/54 - bytes = serialize_to_bytes(result) + bytes = Vector{UInt8}() + serializer = RaySerializer(bytes) + writeheader(serializer) + serialize(serializer, result) + buffer = ray_jll.LocalMemoryBuffer(bytes, sizeof(bytes), true) - push!(returns, buffer) + metadata = ray_jll.NullPtr(ray_jll.Buffer) + inlined_ids = collect(serializer.object_ids) + inlined_refs = ray_jll.GetObjectRefs(worker, StdVector(inlined_ids)) + ray_obj = ray_jll.RayObject(buffer, metadata, inlined_refs, false) + push!(returns, ray_obj) return nothing end