diff --git a/stdlib/Distributed/src/cluster.jl b/stdlib/Distributed/src/cluster.jl index 4eb4c8a3f663b..4ae5e21e19782 100644 --- a/stdlib/Distributed/src/cluster.jl +++ b/stdlib/Distributed/src/cluster.jl @@ -601,8 +601,14 @@ function create_worker(manager, wconfig) # Start a new task to handle inbound messages from connected worker in master. # Also calls `wait_connected` on TCP streams. procmsg_task = process_messages(w.r_stream, w.w_stream, false) + timeout = worker_timeout() Timer(1, interval=1) do timer - istaskstarted(procmsg_task) && istaskdone(procmsg_task) && put!(rr_ntfy_join, nothing) + timeout -= 1 + if timeout <= 0.0 + put!(rr_ntfy_join, :TIMEDOUT) + elseif istaskdone(procmsg_task) + put!(rr_ntfy_join, :ERROR) + end isready(rr_ntfy_join) && close(timer) end @@ -665,7 +671,7 @@ function create_worker(manager, wconfig) delete!(PGRP.refs, ntfy_oid) end - return istaskdone(procmsg_task) ? 0 : w.id + return (istaskdone(procmsg_task) || (first(rr_ntfy_join.c.data) !== :OK)) ? 0 : w.id end diff --git a/stdlib/Distributed/src/process_messages.jl b/stdlib/Distributed/src/process_messages.jl index 7361d4d057e65..7402481334b73 100644 --- a/stdlib/Distributed/src/process_messages.jl +++ b/stdlib/Distributed/src/process_messages.jl @@ -391,7 +391,7 @@ function handle_msg(msg::JoinCompleteMsg, header, r_stream, w_stream, version) w.version = version ntfy_channel = lookup_ref(header.notify_oid) - put!(ntfy_channel, w.id) + put!(ntfy_channel, :OK) push!(default_worker_pool(), w.id) end diff --git a/stdlib/Distributed/test/distributed_exec.jl b/stdlib/Distributed/test/distributed_exec.jl index 3ec2c73719124..7271f13113c7f 100644 --- a/stdlib/Distributed/test/distributed_exec.jl +++ b/stdlib/Distributed/test/distributed_exec.jl @@ -1600,14 +1600,23 @@ try npids = addprocs(1; topology=:all_to_all, lazy=false) @test length(npids) == 1 @test nprocs() == 2 + w2_connect_at = Distributed.PGRP.workers[2].config.connect_at + + # test condition where connect fails immediately lsock = listenany(ip"127.0.0.1", 20000) - Distributed.PGRP.workers[2].config.connect_at=("127.0.0.1", lsock[1]) + Distributed.PGRP.workers[2].config.connect_at = ("127.0.0.1", lsock[1]) close(lsock[2]) npids = addprocs_with_testenv(1; topology=:all_to_all, lazy=false) @test length(npids) == 0 @test nprocs() == 2 - (nprocs() > 1) && rmprocs(workers()) + + # test condition where connect times out + # using a non routable IP to test, ref: https://tools.ietf.org/html/rfc5737 + Distributed.PGRP.workers[2].config.connect_at = ("203.0.113.0", w2_connect_at[2]) + npids = addprocs_with_testenv(1; topology=:all_to_all, lazy=false) + @test length(npids) == 0 finally + rmprocs(workers()) redirect_stderr(old_stderr) close(stderr_in) end