From 83c74d5e96a12275e362914f286d788158e037f3 Mon Sep 17 00:00:00 2001 From: tan Date: Wed, 12 Jun 2019 17:50:33 +0530 Subject: [PATCH] do not lock up addproc on worker setup timeout This is a corollary to the previous commit in #32290, and implements suggestions thereof. It restricts the master to wait for a worker to respond within `Distributed.worker_timeout()` seconds. Beyond that it releases the lock on `rr_ntfy_join` with a special flag `:TIMEDOUT`. This flag is set to `:ERROR` in case of any errors during worker setup, and to `:OK` when the master received a `JoinCompleteMsg` indicating setup completion from worker. `addprocs` returns the worker id in the list of workers it added only if it has received a `JoinCompleteMsg`, that is, only when `rr_ntfy_join` contains `:OK`. Note that the worker process may not be dead yet, and it may still be listed in `workers()` until it actually goes down. --- stdlib/Distributed/src/cluster.jl | 10 ++++++++-- stdlib/Distributed/src/process_messages.jl | 2 +- stdlib/Distributed/test/distributed_exec.jl | 15 +++++++++++++-- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/stdlib/Distributed/src/cluster.jl b/stdlib/Distributed/src/cluster.jl index eb4fa3183d800c..7e6f419700d0dd 100644 --- a/stdlib/Distributed/src/cluster.jl +++ b/stdlib/Distributed/src/cluster.jl @@ -607,8 +607,14 @@ function create_worker(manager, wconfig) # Start a new task to handle inbound messages from connected worker in master. # Also calls `wait_connected` on TCP streams. procmsg_task = process_messages(w.r_stream, w.w_stream, false) + timeout = worker_timeout() Timer(1, interval=1) do timer - istaskstarted(procmsg_task) && istaskdone(procmsg_task) && put!(rr_ntfy_join, nothing) + timeout -= 1 + if timeout <= 0.0 + put!(rr_ntfy_join, :TIMEDOUT) + elseif istaskdone(procmsg_task) + put!(rr_ntfy_join, :ERROR) + end isready(rr_ntfy_join) && close(timer) end @@ -682,7 +688,7 @@ function create_worker(manager, wconfig) delete!(PGRP.refs, ntfy_oid) end - return istaskdone(procmsg_task) ? 0 : w.id + return (fetch(rr_ntfy_join.c) === :OK) ? w.id : 0 end diff --git a/stdlib/Distributed/src/process_messages.jl b/stdlib/Distributed/src/process_messages.jl index 7361d4d057e656..7402481334b736 100644 --- a/stdlib/Distributed/src/process_messages.jl +++ b/stdlib/Distributed/src/process_messages.jl @@ -391,7 +391,7 @@ function handle_msg(msg::JoinCompleteMsg, header, r_stream, w_stream, version) w.version = version ntfy_channel = lookup_ref(header.notify_oid) - put!(ntfy_channel, w.id) + put!(ntfy_channel, :OK) push!(default_worker_pool(), w.id) end diff --git a/stdlib/Distributed/test/distributed_exec.jl b/stdlib/Distributed/test/distributed_exec.jl index 0deb446a365965..3e63d87726ed4f 100644 --- a/stdlib/Distributed/test/distributed_exec.jl +++ b/stdlib/Distributed/test/distributed_exec.jl @@ -1630,14 +1630,25 @@ try npids = addprocs(1; topology=:all_to_all, lazy=false) @test length(npids) == 1 @test nprocs() == 2 + w2_connect_at = Distributed.PGRP.workers[2].config.connect_at + + # test condition where connect fails immediately lsock = listenany(ip"127.0.0.1", 20000) - Distributed.PGRP.workers[2].config.connect_at=("127.0.0.1", lsock[1]) + Distributed.PGRP.workers[2].config.connect_at = ("127.0.0.1", lsock[1]) close(lsock[2]) npids = addprocs_with_testenv(1; topology=:all_to_all, lazy=false) @test length(npids) == 0 @test nprocs() == 2 - (nprocs() > 1) && rmprocs(workers()) + + # test condition where connect times out + # using a non routable IP to test, ref: https://tools.ietf.org/html/rfc5737 + Distributed.PGRP.workers[2].config.connect_at = ("203.0.113.0", w2_connect_at[2]) + withenv("JULIA_WORKER_TIMEOUT"=>1) do + npids = addprocs_with_testenv(1; topology=:all_to_all, lazy=false) + @test length(npids) == 0 + end finally + rmprocs(workers()) redirect_stderr(old_stderr) close(stderr_in) end