Skip to content

Commit

Permalink
Avoid infinite wait when process interrupted (#63)
Browse files Browse the repository at this point in the history
  • Loading branch information
fonsp authored Sep 27, 2023
1 parent 20f8218 commit b673724
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 2 deletions.
23 changes: 23 additions & 0 deletions src/Malt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ end
function unwrap_worker_result(worker::AbstractWorker, result::WorkerResult)
if result.msg_type == MsgType.special_serialization_failure
throw(ErrorException("Error deserializing data from $(summary(worker)):\n\n$(sprint(Base.showerror, result.value))"))
elseif result.msg_type == MsgType.special_worker_terminated
throw(TerminatedWorkerException())
elseif result.msg_type == MsgType.from_worker_call_failure
throw(RemoteException(worker, result.value))
else
Expand Down Expand Up @@ -129,6 +131,7 @@ mutable struct Worker <: AbstractWorker
)
atexit(() -> stop(w))

_exit_loop(w)
_receive_loop(w)

return w
Expand All @@ -138,8 +141,28 @@ end
Base.summary(io::IO, w::Worker) = write(io, "Malt.Worker on port $(w.port) with PID $(w.proc_pid)")



function _exit_loop(worker::Worker)
@async for _i in Iterators.countfrom(1)
try
if !isrunning(worker)
# the worker got shut down, which means that we will never receive one of the expected_replies. So let's give all of them a special_worker_terminated reply.
for c in values(worker.expected_replies)
isready(c) || put!(c, WorkerResult(MsgType.special_worker_terminated, nothing))
end
break
end
sleep(1)
catch e
@error "Unexpection error inside the exit loop" worker exception=(e,catch_backtrace())
end
end
end

function _receive_loop(worker::Worker)
io = worker.current_socket


# Here we use:
# `for _i in Iterators.countfrom(1)`
# instead of
Expand Down
1 change: 1 addition & 0 deletions src/shared.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const MsgType = (
from_worker_call_failure = UInt8(81),
###
special_serialization_failure = UInt8(100),
special_worker_terminated = UInt8(101),
)

const MsgID = UInt64
Expand Down
22 changes: 20 additions & 2 deletions test/basic.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,28 @@
@testset "Worker management" begin
w = W()
@test m.isrunning(w) === true
@test m.remote_call_fetch(&, w, true, true)

W === m.Worker && @test length(m.__iNtErNaL_get_running_procs()) == 1
# Terminating workers takes about 0.5s
m.stop(w)

if W === m.InProcessWorker
m.stop(w)
else
start = time()
task = m.remote_call(sleep, w, 10)

m.stop(w)

@test try
wait(task)
catch e
e
end isa TaskFailedException
stop = time()
@test stop - start < 8
end


@test m.isrunning(w) === false
W === m.Worker && @test length(m.__iNtErNaL_get_running_procs()) == 0
end
Expand Down

0 comments on commit b673724

Please sign in to comment.