Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

do not lock up addprocs on worker setup errors #32290

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 45 additions & 14 deletions stdlib/Distributed/src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,10 @@ if istaskdone(t) # Check if `addprocs` has completed to ensure `fetch` doesn't
end
end
```

Only IDs of workers successfully launched and connected to are returned from a
call to `addprocs`. Therefore it is always a good idea to check the return value
and confirm if the requested number of processes were indeed added.
"""
function addprocs(manager::ClusterManager; kwargs...)
init_multi()
Expand Down Expand Up @@ -540,6 +544,7 @@ default_addprocs_params() = Dict{Symbol,Any}(

function setup_launched_worker(manager, wconfig, launched_q)
pid = create_worker(manager, wconfig)
(pid == 0) && return
push!(launched_q, pid)

# When starting workers on remote multi-core hosts, `launch` can (optionally) start only one
Expand Down Expand Up @@ -577,8 +582,10 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch
let wconfig=wconfig
@async begin
pid = create_worker(manager, wconfig)
remote_do(redirect_output_from_additional_worker, frompid, pid, port)
push!(launched_q, pid)
if pid !== 0
remote_do(redirect_output_from_additional_worker, frompid, pid, port)
push!(launched_q, pid)
end
end
end
end
Expand All @@ -588,13 +595,27 @@ end
function create_worker(manager, wconfig)
# only node 1 can add new nodes, since nobody else has the full list of address:port
@assert LPROC.id == 1
timeout = worker_timeout()

# initiate a connect. Does not wait for connection completion in case of TCP.
w = Worker()
local r_s, w_s


try
(r_s, w_s) = connect(manager, w.id, wconfig)
rw_future = Future()
@async try
put!(rw_future, connect(manager, w.id, wconfig))
catch connect_error
put!(rw_future, connect_error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move put! outside the try/catch, and rethrow on unexpected error types

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name connect_error may be misleading. The intention was to handle any error here in the same manner.

end
# allow timeout for both steps - reading host:port and actually connecting
Timer(worker_timeout() * 2) do timer
isready(rw_future) || put!(rw_future, nothing)
end
result = fetch(rw_future)
(r_s, w_s) = isa(result, Nothing) ? throw(LaunchWorkerError("Timed out connecting to worker.")) :
isa(result, Exception) ? throw(result) :
result
catch ex
try
deregister_worker(w.id)
Expand All @@ -619,7 +640,22 @@ function create_worker(manager, wconfig)

# Start a new task to handle inbound messages from connected worker in master.
# Also calls `wait_connected` on TCP streams.
process_messages(w.r_stream, w.w_stream, false)
procmsg_task = process_messages(w.r_stream, w.w_stream, false)
timeout = worker_timeout()
Timer(1, interval=1) do timer
timeout -= 1
try
if timeout <= 0.0
put!(rr_ntfy_join, :TIMEDOUT)
elseif istaskdone(procmsg_task)
put!(rr_ntfy_join, :ERROR)
end
catch
# can happen if rr_ntfy_join is already filled
close(timer)
end
isready(rr_ntfy_join) && close(timer)
end

# send address information of all workers to the new worker.
# Cluster managers set the address of each worker in `WorkerConfig.connect_at`.
Expand All @@ -635,6 +671,7 @@ function create_worker(manager, wconfig)
# - Workers with incoming connection requests write back their Version and an IdentifySocketAckMsg message
# - On master, receiving a JoinCompleteMsg triggers rr_ntfy_join (signifies that worker setup is complete)

maxwait = worker_timeout()
join_list = []
if PGRP.topology === :all_to_all
# need to wait for lower worker pids to have completed connecting, since the numerical value
Expand All @@ -656,8 +693,8 @@ function create_worker(manager, wconfig)
waittime = 0
while wconfig.connect_idents !== nothing &&
length(wlist) < length(wconfig.connect_idents)
if waittime >= timeout
error("peer workers did not connect within $timeout seconds")
if waittime >= maxwait
error("peer workers did not connect within $maxwait seconds")
end
sleep(1.0)
waittime += 1
Expand All @@ -680,18 +717,12 @@ function create_worker(manager, wconfig)
send_msg_now(w, MsgHeader(RRID(0,0), ntfy_oid), join_message)

@async manage(w.manager, w.id, w.config, :register)
# wait for rr_ntfy_join with timeout
timedout = false
@async (sleep($timeout); timedout = true; put!(rr_ntfy_join, 1))
wait(rr_ntfy_join)
if timedout
error("worker did not connect within $timeout seconds")
end
lock(client_refs) do
delete!(PGRP.refs, ntfy_oid)
end

return w.id
return (fetch(rr_ntfy_join.c) === :OK) ? w.id : 0
end


Expand Down
18 changes: 16 additions & 2 deletions stdlib/Distributed/src/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -714,12 +714,26 @@ It should cause the remote worker specified by `pid` to exit.
on `pid`.
"""
function kill(manager::ClusterManager, pid::Int, config::WorkerConfig)
remote_do(exit, pid)
try
remote_do(exit, pid) # For TCP based transports this will result in a close of the socket
# at our end, which will result in a cleanup of the worker.
catch ex
if !isa(ex, ProcessExitedException) || (ex.worker_id !== pid)
@warn("Failed to kill remote worker $pid. Worker unreachable, unresponsive or already exited - $ex.")
end
end
nothing
end

function kill(manager::SSHManager, pid::Int, config::WorkerConfig)
remote_do(exit, pid)
try
remote_do(exit, pid) # For TCP based transports this will result in a close of the socket
# at our end, which will result in a cleanup of the worker.
catch ex
if !isa(ex, ProcessExitedException) || (ex.worker_id !== pid)
@warn("Failed to kill remote worker $pid. Worker unreachable, unresponsive or already exited - $ex.")
end
end
cancel_ssh_tunnel(config)
nothing
end
2 changes: 1 addition & 1 deletion stdlib/Distributed/src/process_messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ function handle_msg(msg::JoinCompleteMsg, header, r_stream, w_stream, version)
w.version = version

ntfy_channel = lookup_ref(header.notify_oid)
put!(ntfy_channel, w.id)
put!(ntfy_channel, :OK)

push!(default_worker_pool(), w.id)
end
Loading