Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
fonsp committed Sep 27, 2023
1 parent ffb285d commit 338f0d2
Showing 1 changed file with 16 additions and 9 deletions.
25 changes: 16 additions & 9 deletions src/Malt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -134,38 +134,43 @@ mutable struct Worker <: AbstractWorker
)
atexit(() -> stop(w))

_exit_loop(w)
_receive_loop(w)

return w
end
end

Base.summary(io::IO, w::Worker) = write(io, "Malt.Worker on port $(w.port) with PID $(w.proc_pid)")


function _receive_loop(worker::Worker)
io = worker.current_socket

exit_handler_task = @async for _i in Iterators.countfrom(1)

function _exit_loop(worker::Worker)
@async for _i in Iterators.countfrom(1)
try
if !isrunning(worker)
# the worker got shut down, which means that we will never receive one of the expected_replies. So let's give all of them a special_worker_terminated reply.
for c in values(worker.expected_replies)
isready(c) || put!(c, WorkerResult(MsgType.special_worker_terminated, nothing))
end
break
end
sleep(1)
catch e
@error "asdfdfs" exception=(e,catch_backtrace())
@error "Unexpection error inside the exit loop" worker exception=(e,catch_backtrace())
end
end

end

function _receive_loop(worker::Worker)
io = worker.current_socket


# Here we use:
# `for _i in Iterators.countfrom(1)`
# instead of
# `while true`
# as a workaround for https://github.com/JuliaLang/julia/issues/37154
listen_task = @async for _i in Iterators.countfrom(1)
@async for _i in Iterators.countfrom(1)
try
if !isopen(io)
@debug("HOST: io closed.")
Expand Down Expand Up @@ -610,7 +615,7 @@ latest request (`remote_call*` or `remote_eval*`) that was sent to the worker.
"""
function interrupt(w::Worker)
if !isrunning(w)
@warn "Tried to interrupt a worker that has already stopped running." summary(w)
@warn "Tried to interrupt a worker that has already shut down." summary(w)
else
if Sys.iswindows()
ccall((:GenerateConsoleCtrlEvent,"Kernel32"), Bool, (UInt32, UInt32), UInt32(1), UInt32(getpid(w.proc)))
Expand All @@ -626,6 +631,8 @@ function interrupt(w::InProcessWorker)
end




# Based on `Base.task_done_hook`
function _rethrow_to_repl(e::InterruptException; rethrow_regular::Bool=false)
if isdefined(Base, :active_repl_backend) &&
Expand Down

0 comments on commit 338f0d2

Please sign in to comment.