diff --git a/stdlib/Distributed/src/cluster.jl b/stdlib/Distributed/src/cluster.jl index 9bdfde7f920d4b..eb4fa3183d800c 100644 --- a/stdlib/Distributed/src/cluster.jl +++ b/stdlib/Distributed/src/cluster.jl @@ -524,6 +524,7 @@ default_addprocs_params() = Dict{Symbol,Any}( function setup_launched_worker(manager, wconfig, launched_q) pid = create_worker(manager, wconfig) + (pid == 0) && return push!(launched_q, pid) # When starting workers on remote multi-core hosts, `launch` can (optionally) start only one @@ -561,8 +562,10 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch let wconfig=wconfig @async begin pid = create_worker(manager, wconfig) - remote_do(redirect_output_from_additional_worker, frompid, pid, port) - push!(launched_q, pid) + if pid !== 0 + remote_do(redirect_output_from_additional_worker, frompid, pid, port) + push!(launched_q, pid) + end end end end @@ -603,7 +606,11 @@ function create_worker(manager, wconfig) # Start a new task to handle inbound messages from connected worker in master. # Also calls `wait_connected` on TCP streams. - process_messages(w.r_stream, w.w_stream, false) + procmsg_task = process_messages(w.r_stream, w.w_stream, false) + Timer(1, interval=1) do timer + istaskstarted(procmsg_task) && istaskdone(procmsg_task) && put!(rr_ntfy_join, nothing) + isready(rr_ntfy_join) && close(timer) + end # send address information of all workers to the new worker. # Cluster managers set the address of each worker in `WorkerConfig.connect_at`. @@ -675,7 +682,7 @@ function create_worker(manager, wconfig) delete!(PGRP.refs, ntfy_oid) end - return w.id + return istaskdone(procmsg_task) ? 0 : w.id end diff --git a/stdlib/Distributed/test/distributed_exec.jl b/stdlib/Distributed/test/distributed_exec.jl index 379a2b5b46fb5b..0deb446a365965 100644 --- a/stdlib/Distributed/test/distributed_exec.jl +++ b/stdlib/Distributed/test/distributed_exec.jl @@ -1583,7 +1583,7 @@ end # Issue # 22865 # Must be run on a new cluster, i.e., all workers must be in the same state. -@assert nprocs() == 1 +(nprocs() > 1) && rmprocs(workers()) p1,p2 = addprocs_with_testenv(2) @everywhere f22865(p) = remotecall_fetch(x->x.*2, p, fill(1.,2)) @test fill(2.,2) == remotecall_fetch(f22865, p1, p2) @@ -1622,11 +1622,31 @@ function reuseport_tests() @test all(in(results), procs()) end +# Test failure during worker setup +old_stderr = stderr +stderr_out, stderr_in = redirect_stderr() +try + (nprocs() > 1) && rmprocs(workers()) + npids = addprocs(1; topology=:all_to_all, lazy=false) + @test length(npids) == 1 + @test nprocs() == 2 + lsock = listenany(ip"127.0.0.1", 20000) + Distributed.PGRP.workers[2].config.connect_at=("127.0.0.1", lsock[1]) + close(lsock[2]) + npids = addprocs_with_testenv(1; topology=:all_to_all, lazy=false) + @test length(npids) == 0 + @test nprocs() == 2 + (nprocs() > 1) && rmprocs(workers()) +finally + redirect_stderr(old_stderr) + close(stderr_in) +end + # Test that the client port is reused. SO_REUSEPORT may not be supported on # all UNIX platforms, Linux kernels prior to 3.9 and older versions of OSX -@assert nprocs() == 1 -addprocs_with_testenv(4; lazy=false) if ccall(:jl_has_so_reuseport, Int32, ()) == 1 + (nprocs() > 1) && rmprocs(workers()) + addprocs_with_testenv(4; lazy=false) reuseport_tests() else @info "SO_REUSEPORT is unsupported, skipping reuseport tests" @@ -1686,5 +1706,5 @@ include("splitrange.jl") # Run topology tests last after removing all workers, since a given # cluster at any time only supports a single topology. -rmprocs(workers()) +(nprocs() > 1) && rmprocs(workers()) include("topology.jl")