Skip to content

Commit

Permalink
do not lock up addproc on worker setup errors
Browse files Browse the repository at this point in the history
`create_worker` (invoked during `addprocs`) waits on a message from the worker to indicate success. If the worker process terminates before sending this response, `create_worker` (and therefore `addprocs`) will remain locked up.

Usually the master process does become aware of a terminated worker, when the communication channel between them breaks due to worker exiting. The message processing loop exits as a result.

This commit introduces an additional task (timer) that monitors this message processing loop while master is waiting for a `JoinCompleteMsg` response from a worker. It makes `create_worker` return both when setup is successful (master receives a `JoinCompleteMsg`) and also when worker is terminated. Return value of `create_worker` is 0 when worker setup fails, instead of worker id when it is successful. Return value of `addprocs` contains only workers that were successfully launched and connected to. Added some tests for that too.
  • Loading branch information
tanmaykm committed May 13, 2020
1 parent db90631 commit 5ddcded
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 8 deletions.
15 changes: 11 additions & 4 deletions stdlib/Distributed/src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ default_addprocs_params() = Dict{Symbol,Any}(

function setup_launched_worker(manager, wconfig, launched_q)
pid = create_worker(manager, wconfig)
(pid == 0) && return
push!(launched_q, pid)

# When starting workers on remote multi-core hosts, `launch` can (optionally) start only one
Expand Down Expand Up @@ -561,8 +562,10 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch
let wconfig=wconfig
@async begin
pid = create_worker(manager, wconfig)
remote_do(redirect_output_from_additional_worker, frompid, pid, port)
push!(launched_q, pid)
if pid !== 0
remote_do(redirect_output_from_additional_worker, frompid, pid, port)
push!(launched_q, pid)
end
end
end
end
Expand Down Expand Up @@ -603,7 +606,11 @@ function create_worker(manager, wconfig)

# Start a new task to handle inbound messages from connected worker in master.
# Also calls `wait_connected` on TCP streams.
process_messages(w.r_stream, w.w_stream, false)
procmsg_task = process_messages(w.r_stream, w.w_stream, false)
Timer(1, interval=1) do timer
istaskstarted(procmsg_task) && istaskdone(procmsg_task) && put!(rr_ntfy_join, nothing)
isready(rr_ntfy_join) && close(timer)
end

# send address information of all workers to the new worker.
# Cluster managers set the address of each worker in `WorkerConfig.connect_at`.
Expand Down Expand Up @@ -675,7 +682,7 @@ function create_worker(manager, wconfig)
delete!(PGRP.refs, ntfy_oid)
end

return w.id
return istaskdone(procmsg_task) ? 0 : w.id
end


Expand Down
28 changes: 24 additions & 4 deletions stdlib/Distributed/test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1583,7 +1583,7 @@ end

# Issue # 22865
# Must be run on a new cluster, i.e., all workers must be in the same state.
@assert nprocs() == 1
(nprocs() > 1) && rmprocs(workers())
p1,p2 = addprocs_with_testenv(2)
@everywhere f22865(p) = remotecall_fetch(x->x.*2, p, fill(1.,2))
@test fill(2.,2) == remotecall_fetch(f22865, p1, p2)
Expand Down Expand Up @@ -1622,11 +1622,31 @@ function reuseport_tests()
@test all(in(results), procs())
end

# Test failure during worker setup
old_stderr = stderr
stderr_out, stderr_in = redirect_stderr()
try
(nprocs() > 1) && rmprocs(workers())
npids = addprocs(1; topology=:all_to_all, lazy=false)
@test length(npids) == 1
@test nprocs() == 2
lsock = listenany(ip"127.0.0.1", 20000)
Distributed.PGRP.workers[2].config.connect_at=("127.0.0.1", lsock[1])
close(lsock[2])
npids = addprocs_with_testenv(1; topology=:all_to_all, lazy=false)
@test length(npids) == 0
@test nprocs() == 2
(nprocs() > 1) && rmprocs(workers())
finally
redirect_stderr(old_stderr)
close(stderr_in)
end

# Test that the client port is reused. SO_REUSEPORT may not be supported on
# all UNIX platforms, Linux kernels prior to 3.9 and older versions of OSX
@assert nprocs() == 1
addprocs_with_testenv(4; lazy=false)
if ccall(:jl_has_so_reuseport, Int32, ()) == 1
(nprocs() > 1) && rmprocs(workers())
addprocs_with_testenv(4; lazy=false)
reuseport_tests()
else
@info "SO_REUSEPORT is unsupported, skipping reuseport tests"
Expand Down Expand Up @@ -1686,5 +1706,5 @@ include("splitrange.jl")

# Run topology tests last after removing all workers, since a given
# cluster at any time only supports a single topology.
rmprocs(workers())
(nprocs() > 1) && rmprocs(workers())
include("topology.jl")

0 comments on commit 5ddcded

Please sign in to comment.