Skip to content

Commit

Permalink
do not lock up addproc on worker setup timeout
Browse files Browse the repository at this point in the history
This is a corollary to the previous commit in JuliaLang#32290, and implements suggestions thereof.

It restricts the master to wait for a worker to respond within `Distributed.worker_timeout()` seconds. Beyond that it releases the lock on `rr_ntfy_join` with a special flag `:TIMEDOUT`. This flag is set to `:ERROR` in case of any errors during worker setup, and to `:OK` when the master received a `JoinCompleteMsg` indicating setup completion from worker.

`addprocs` returns the worker id in the list of workers it added only if it has received a `JoinCompleteMsg`, that is, only when `rr_ntfy_join` contains `:OK`. Note that the worker process may not be dead yet, and it may still be listed in `workers()` until it actually goes down.
  • Loading branch information
tanmaykm committed Jun 12, 2019
1 parent 42c4020 commit c0ecfb4
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 5 deletions.
10 changes: 8 additions & 2 deletions stdlib/Distributed/src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -601,8 +601,14 @@ function create_worker(manager, wconfig)
# Start a new task to handle inbound messages from connected worker in master.
# Also calls `wait_connected` on TCP streams.
procmsg_task = process_messages(w.r_stream, w.w_stream, false)
timeout = worker_timeout()
Timer(1, interval=1) do timer
istaskstarted(procmsg_task) && istaskdone(procmsg_task) && put!(rr_ntfy_join, nothing)
timeout -= 1
if timeout <= 0.0
put!(rr_ntfy_join, :TIMEDOUT)
elseif istaskdone(procmsg_task)
put!(rr_ntfy_join, :ERROR)
end
isready(rr_ntfy_join) && close(timer)
end

Expand Down Expand Up @@ -665,7 +671,7 @@ function create_worker(manager, wconfig)
delete!(PGRP.refs, ntfy_oid)
end

return istaskdone(procmsg_task) ? 0 : w.id
return (fetch(rr_ntfy_join.c) === :OK) ? w.id : 0
end


Expand Down
2 changes: 1 addition & 1 deletion stdlib/Distributed/src/process_messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ function handle_msg(msg::JoinCompleteMsg, header, r_stream, w_stream, version)
w.version = version

ntfy_channel = lookup_ref(header.notify_oid)
put!(ntfy_channel, w.id)
put!(ntfy_channel, :OK)

push!(default_worker_pool(), w.id)
end
15 changes: 13 additions & 2 deletions stdlib/Distributed/test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1600,14 +1600,25 @@ try
npids = addprocs(1; topology=:all_to_all, lazy=false)
@test length(npids) == 1
@test nprocs() == 2
w2_connect_at = Distributed.PGRP.workers[2].config.connect_at

# test condition where connect fails immediately
lsock = listenany(ip"127.0.0.1", 20000)
Distributed.PGRP.workers[2].config.connect_at=("127.0.0.1", lsock[1])
Distributed.PGRP.workers[2].config.connect_at = ("127.0.0.1", lsock[1])
close(lsock[2])
npids = addprocs_with_testenv(1; topology=:all_to_all, lazy=false)
@test length(npids) == 0
@test nprocs() == 2
(nprocs() > 1) && rmprocs(workers())

# test condition where connect times out
# using a non routable IP to test, ref: https://tools.ietf.org/html/rfc5737
Distributed.PGRP.workers[2].config.connect_at = ("203.0.113.0", w2_connect_at[2])
withenv("JULIA_WORKER_TIMEOUT"=>1) do
npids = addprocs_with_testenv(1; topology=:all_to_all, lazy=false)
@test length(npids) == 0
end
finally
rmprocs(workers())
redirect_stderr(old_stderr)
close(stderr_in)
end
Expand Down

0 comments on commit c0ecfb4

Please sign in to comment.