Skip to content

Commit

Permalink
create new tasks in the parent world (#41449)
Browse files Browse the repository at this point in the history
N.B. This means serialized tasks will discard this stateful information
and pick up new/different information.

Closes #35690
Closes #41332
  • Loading branch information
vtjnash authored Sep 15, 2021
1 parent fe6d61b commit ac83e57
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 9 deletions.
10 changes: 5 additions & 5 deletions src/process_messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ function showerror(io::IO, re::RemoteException)
showerror(io, re.captured)
end

function run_work_thunk(thunk, print_error)
function run_work_thunk(thunk::Function, print_error::Bool)
local result
try
result = thunk()
Expand Down Expand Up @@ -271,11 +271,11 @@ function process_hdr(s, validate_cookie)
end

function handle_msg(msg::CallMsg{:call}, header, r_stream, w_stream, version)
schedule_call(header.response_oid, ()->msg.f(msg.args...; msg.kwargs...))
schedule_call(header.response_oid, ()->invokelatest(msg.f, msg.args...; msg.kwargs...))
end
function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, version)
errormonitor(@async begin
v = run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), false)
v = run_work_thunk(()->invokelatest(msg.f, msg.args...; msg.kwargs...), false)
if isa(v, SyncTake)
try
deliver_result(w_stream, :call_fetch, header.notify_oid, v.v)
Expand All @@ -291,14 +291,14 @@ end

function handle_msg(msg::CallWaitMsg, header, r_stream, w_stream, version)
errormonitor(@async begin
rv = schedule_call(header.response_oid, ()->msg.f(msg.args...; msg.kwargs...))
rv = schedule_call(header.response_oid, ()->invokelatest(msg.f, msg.args...; msg.kwargs...))
deliver_result(w_stream, :call_wait, header.notify_oid, fetch(rv.c))
nothing
end)
end

function handle_msg(msg::RemoteDoMsg, header, r_stream, w_stream, version)
errormonitor(@async run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), true))
errormonitor(@async run_work_thunk(()->invokelatest(msg.f, msg.args...; msg.kwargs...), true))
end

function handle_msg(msg::ResultMsg, header, r_stream, w_stream, version)
Expand Down
5 changes: 1 addition & 4 deletions src/remotecall.jl
Original file line number Diff line number Diff line change
Expand Up @@ -370,10 +370,7 @@ end
# make a thunk to call f on args in a way that simulates what would happen if
# the function were sent elsewhere
function local_remotecall_thunk(f, args, kwargs)
if isempty(args) && isempty(kwargs)
return f
end
return ()->f(args...; kwargs...)
return ()->invokelatest(f, args...; kwargs...)
end

function remotecall(f, w::LocalProcess, args...; kwargs...)
Expand Down

0 comments on commit ac83e57

Please sign in to comment.