diff --git a/NEWS.md b/NEWS.md index b25a8940475263..37e07e95c46900 100644 --- a/NEWS.md +++ b/NEWS.md @@ -79,6 +79,9 @@ This section lists changes that do not have deprecation warnings. the type of `n`). Use the corresponding mutating functions `randperm!` and `randcycle!` to control the array type ([#22723]). + * Worker-worker connections are setup lazily for an `:all_to_all` topology. Use keyword + arg `lazy=false` to force all connections to be setup during a `addprocs` call. ([#22814]) + Library improvements -------------------- diff --git a/base/distributed/cluster.jl b/base/distributed/cluster.jl index f82314bab58e5d..ed4b87762a3012 100644 --- a/base/distributed/cluster.jl +++ b/base/distributed/cluster.jl @@ -59,6 +59,7 @@ mutable struct Worker state::WorkerState c_state::Condition # wait for state changes ct_time::Float64 # creation time + conn_func::Nullable{Function} # Used to setup connections lazily r_stream::IO w_stream::IO @@ -82,12 +83,13 @@ mutable struct Worker w end - function Worker(id::Int) + Worker(id::Int) = Worker(id, Nullable{Function}()) + function Worker(id::Int, conn_func) @assert id > 0 if haskey(map_pid_wrkr, id) return map_pid_wrkr[id] end - w=new(id, [], [], false, W_CREATED, Condition(), time()) + w=new(id, [], [], false, W_CREATED, Condition(), time(), conn_func) register_worker(w) w end @@ -102,21 +104,56 @@ end function check_worker_state(w::Worker) if w.state == W_CREATED - if PGRP.topology == :all_to_all - # Since higher pids connect with lower pids, the remote worker - # may not have connected to us yet. Wait for some time. - timeout = worker_timeout() - (time() - w.ct_time) - timeout <= 0 && error("peer $(w.id) has not connected to $(myid())") - - @schedule (sleep(timeout); notify(w.c_state; all=true)) - wait(w.c_state) - w.state == W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds") + if !isclusterlazy() + if PGRP.topology == :all_to_all + # Since higher pids connect with lower pids, the remote worker + # may not have connected to us yet. Wait for some time. + wait_for_conn(w) + else + error("peer $(w.id) is not connected to $(myid()). Topology : " * string(PGRP.topology)) + end else - error("peer $(w.id) is not connected to $(myid()). Topology : " * string(PGRP.topology)) + w.ct_time = time() + if myid() > w.id + @schedule exec_conn_func(w) + else + # route request via node 1 + @schedule remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid()) + end + wait_for_conn(w) end end end +exec_conn_func(id::Int) = exec_conn_func(worker_from_id(id)) +function exec_conn_func(w::Worker) + if isnull(w.conn_func) + return wait_for_conn(w) # Some other task may be trying to connect at the same time. + end + + try + f = get(w.conn_func) + w.conn_func = Nullable{Function}() + f() + catch e + w.conn_func = () -> throw(e) + rethrow(e) + end + nothing +end + +function wait_for_conn(w) + if w.state == W_CREATED + timeout = worker_timeout() - (time() - w.ct_time) + timeout <= 0 && error("peer $(w.id) has not connected to $(myid())") + + @schedule (sleep(timeout); notify(w.c_state; all=true)) + wait(w.c_state) + w.state == W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds") + end + nothing +end + ## process group creation ## mutable struct LocalProcess @@ -340,6 +377,17 @@ function addprocs_locked(manager::ClusterManager; kwargs...) params = merge(default_addprocs_params(), AnyDict(kwargs)) topology(Symbol(params[:topology])) + if PGRP.topology != :all_to_all + params[:lazy] = false + end + + if isnull(PGRP.lazy) || nprocs() == 1 + PGRP.lazy = Nullable{Bool}(params[:lazy]) + elseif isclusterlazy() != params[:lazy] + throw(ArgumentError(string("Active workers with lazy=", isclusterlazy(), + ". Cannot set lazy=", params[:lazy]))) + end + # References to launched workers, filled when each worker is fully initialized and # has connected to all nodes. launched_q = Int[] # Asynchronously filled by the launch method @@ -396,7 +444,8 @@ default_addprocs_params() = AnyDict( :dir => pwd(), :exename => joinpath(JULIA_HOME, julia_exename()), :exeflags => ``, - :enable_threaded_blas => false) + :enable_threaded_blas => false, + :lazy => true) function setup_launched_worker(manager, wconfig, launched_q) @@ -517,7 +566,7 @@ function create_worker(manager, wconfig) all_locs = map(x -> isa(x, Worker) ? (get(x.config.connect_at, ()), x.id) : ((), x.id, true), join_list) send_connection_hdr(w, true) - join_message = JoinPGRPMsg(w.id, all_locs, PGRP.topology, get(wconfig.enable_threaded_blas, false)) + join_message = JoinPGRPMsg(w.id, all_locs, PGRP.topology, get(wconfig.enable_threaded_blas, false), isclusterlazy()) send_msg_now(w, MsgHeader(RRID(0,0), ntfy_oid), join_message) @schedule manage(w.manager, w.id, w.config, :register) @@ -619,8 +668,9 @@ mutable struct ProcessGroup workers::Array{Any,1} refs::Dict # global references topology::Symbol + lazy::Nullable{Bool} - ProcessGroup(w::Array{Any,1}) = new("pg-default", w, Dict(), :all_to_all) + ProcessGroup(w::Array{Any,1}) = new("pg-default", w, Dict(), :all_to_all, Nullable{Bool}()) end const PGRP = ProcessGroup([]) @@ -634,6 +684,14 @@ function topology(t) t end +function isclusterlazy() + if isnull(PGRP.lazy) + return false + else + return get(PGRP.lazy) + end +end + get_bind_addr(pid::Integer) = get_bind_addr(worker_from_id(pid)) get_bind_addr(w::LocalProcess) = LPROC.bind_addr function get_bind_addr(w::Worker) @@ -667,7 +725,7 @@ myid() = LPROC.id Get the number of available processes. """ function nprocs() - if myid() == 1 || PGRP.topology == :all_to_all + if myid() == 1 || (PGRP.topology == :all_to_all && !isclusterlazy()) n = length(PGRP.workers) # filter out workers in the process of being setup/shutdown. for jw in PGRP.workers @@ -698,7 +756,7 @@ end Returns a list of all process identifiers. """ function procs() - if myid() == 1 || PGRP.topology == :all_to_all + if myid() == 1 || (PGRP.topology == :all_to_all && !isclusterlazy()) # filter out workers in the process of being setup/shutdown. return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || (x.state == W_CONNECTED)] else @@ -707,7 +765,7 @@ function procs() end function id_in_procs(id) # faster version of `id in procs()` - if myid() == 1 || PGRP.topology == :all_to_all + if myid() == 1 || (PGRP.topology == :all_to_all && !isclusterlazy()) for x in PGRP.workers if (x.id::Int) == id && (isa(x, LocalProcess) || (x::Worker).state == W_CONNECTED) return true @@ -903,7 +961,7 @@ function deregister_worker(pg, pid) if myid() == 1 && isdefined(w, :config) # Notify the cluster manager of this workers death manage(w.manager, w.id, w.config, :deregister) - if PGRP.topology != :all_to_all + if PGRP.topology != :all_to_all || isclusterlazy() for rpid in workers() try remote_do(deregister_worker, rpid, pid) diff --git a/base/distributed/managers.jl b/base/distributed/managers.jl index f14f46829f3124..8a09063dceef4d 100644 --- a/base/distributed/managers.jl +++ b/base/distributed/managers.jl @@ -100,6 +100,10 @@ Keyword arguments: A worker with a cluster manager identity `ident` will connect to all workers specified in `connect_idents`. +* `lazy`: Applicable only with `topology=:all_to_all`. If `true`, worker-worker connections + are setup lazily, i.e. they are setup at the first instance of a remote call between + workers. Default is true. + Environment variables : @@ -302,7 +306,7 @@ addprocs(; kwargs...) = addprocs(Sys.CPU_CORES; kwargs...) Launches workers using the in-built `LocalManager` which only launches workers on the local host. This can be used to take advantage of multiple cores. `addprocs(4)` will add 4 processes on the local machine. If `restrict` is `true`, binding is restricted to -`127.0.0.1`. Keyword args `dir`, `exename`, `exeflags`, `topology`, and +`127.0.0.1`. Keyword args `dir`, `exename`, `exeflags`, `topology`, `lazy` and `enable_threaded_blas` have the same effect as documented for `addprocs(machines)`. """ function addprocs(np::Integer; restrict=true, kwargs...) diff --git a/base/distributed/messages.jl b/base/distributed/messages.jl index df5cfd0255aa88..12f347fa493f0f 100644 --- a/base/distributed/messages.jl +++ b/base/distributed/messages.jl @@ -68,6 +68,7 @@ struct JoinPGRPMsg <: AbstractMsg other_workers::Array topology::Symbol enable_threaded_blas::Bool + lazy::Bool end struct JoinCompleteMsg <: AbstractMsg cpu_cores::Int diff --git a/base/distributed/process_messages.jl b/base/distributed/process_messages.jl index 1eba6eafd496d1..a86799c5eae33f 100644 --- a/base/distributed/process_messages.jl +++ b/base/distributed/process_messages.jl @@ -307,14 +307,22 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) disable_threaded_libs() end + lazy = msg.lazy + PGRP.lazy = Nullable{Bool}(lazy) + wait_tasks = Task[] for (connect_at, rpid) in msg.other_workers wconfig = WorkerConfig() wconfig.connect_at = connect_at let rpid=rpid, wconfig=wconfig - t = @async connect_to_peer(cluster_manager, rpid, wconfig) - push!(wait_tasks, t) + if lazy + # The constructor registers the object with a global registry. + Worker(rpid, Nullable{Function}(()->connect_to_peer(cluster_manager, rpid, wconfig))) + else + t = @async connect_to_peer(cluster_manager, rpid, wconfig) + push!(wait_tasks, t) + end end end diff --git a/doc/src/manual/parallel-computing.md b/doc/src/manual/parallel-computing.md index 04e1400b295d0e..0d31a561f149ed 100644 --- a/doc/src/manual/parallel-computing.md +++ b/doc/src/manual/parallel-computing.md @@ -1300,6 +1300,12 @@ connected to each other: fields `ident` and `connect_idents` in `WorkerConfig`. A worker with a cluster-manager-provided identity `ident` will connect to all workers specified in `connect_idents`. +Keyword argument `lazy=true|false` only affects `topology` option `:all_to_all`. If `true`, the cluster +starts off with the master connected to all workers. Specific worker-worker connections are established +at the first remote invocation between two workers. This helps in reducing initial resources allocated for +intra-cluster communication. Connections are setup depending on the runtime requirements of a parallel +program. Default value for `lazy` is `true`. + Currently, sending a message between unconnected workers results in an error. This behaviour, as with the functionality and interface, should be considered experimental in nature and may change in future releases. diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index 7226a5c0a83f63..46d331ce53f504 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -13,47 +13,6 @@ include("testenv.jl") addprocs_with_testenv(4) @test nprocs() == 5 -function reuseport_tests() - # Run the test on all processes. - results = asyncmap(procs()) do p - remotecall_fetch(p) do - ports_lower = [] # ports of pids lower than myid() - ports_higher = [] # ports of pids higher than myid() - for w in Base.Distributed.PGRP.workers - w.id == myid() && continue - port = Base._sockname(w.r_stream, true)[2] - if (w.id == 1) - # master connects to workers - push!(ports_higher, port) - elseif w.id < myid() - push!(ports_lower, port) - elseif w.id > myid() - push!(ports_higher, port) - end - end - @assert (length(ports_lower) + length(ports_higher)) == nworkers() - for portset in [ports_lower, ports_higher] - if (length(portset) > 0) && (length(unique(portset)) != 1) - warn("SO_REUSEPORT TESTS FAILED. UNSUPPORTED/OLDER UNIX VERSION?") - return 0 - end - end - return myid() - end - end - - # Ensure that the code has indeed been successfully executed everywhere - @test all(p -> p in results, procs()) -end - -# Test that the client port is reused. SO_REUSEPORT may not be supported on -# all UNIX platforms, Linux kernels prior to 3.9 and older versions of OSX -if ccall(:jl_has_so_reuseport, Int32, ()) == 1 - reuseport_tests() -else - info("SO_REUSEPORT is unsupported, skipping reuseport tests.") -end - id_me = myid() id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))] @@ -1817,6 +1776,49 @@ p1,p2 = addprocs_with_testenv(2) @everywhere f22865(p) = remotecall_fetch(x->x.*2, p, ones(2)) @test ones(2).*2 == remotecall_fetch(f22865, p1, p2) +function reuseport_tests() + # Run the test on all processes. + results = asyncmap(procs()) do p + remotecall_fetch(p) do + ports_lower = [] # ports of pids lower than myid() + ports_higher = [] # ports of pids higher than myid() + for w in Base.Distributed.PGRP.workers + w.id == myid() && continue + port = Base._sockname(w.r_stream, true)[2] + if (w.id == 1) + # master connects to workers + push!(ports_higher, port) + elseif w.id < myid() + push!(ports_lower, port) + elseif w.id > myid() + push!(ports_higher, port) + end + end + @assert (length(ports_lower) + length(ports_higher)) == nworkers() + for portset in [ports_lower, ports_higher] + if (length(portset) > 0) && (length(unique(portset)) != 1) + warn("SO_REUSEPORT TESTS FAILED. UNSUPPORTED/OLDER UNIX VERSION?") + return 0 + end + end + return myid() + end + end + + # Ensure that the code has indeed been successfully executed everywhere + @test all(p -> p in results, procs()) +end + +# Test that the client port is reused. SO_REUSEPORT may not be supported on +# all UNIX platforms, Linux kernels prior to 3.9 and older versions of OSX +if ccall(:jl_has_so_reuseport, Int32, ()) == 1 + rmprocs(workers()) + addprocs_with_testenv(4; lazy=false) + reuseport_tests() +else + info("SO_REUSEPORT is unsupported, skipping reuseport tests.") +end + # Run topology tests last after removing all workers, since a given # cluster at any time only supports a single topology. rmprocs(workers()) diff --git a/test/topology.jl b/test/topology.jl index 52cf1bfa29be34..1d5ef2b2176126 100644 --- a/test/topology.jl +++ b/test/topology.jl @@ -90,3 +90,48 @@ for p1 in workers() end remove_workers_and_test() + +# test `lazy` connection setup +function def_count_conn() + @everywhere function count_connected_workers() + count(x -> isa(x, Base.Distributed.Worker) && isdefined(x, :r_stream) && isopen(x.r_stream), + Base.Distributed.PGRP.workers) + end +end + +addprocs_with_testenv(8) +def_count_conn() + +# Test for 10 random combinations +wl = workers() +combinations = [] +while length(combinations) < 10 + from = rand(wl) + to = rand(wl) + if from == to || ((from,to) in combinations) || ((to,from) in combinations) + continue + else + push!(combinations, (from,to)) + end +end + +# Initially only master-slave connections ought to be setup +expected_num_conns = 8 +num_conns = sum(asyncmap(p->remotecall_fetch(count_connected_workers,p), workers())) +@test num_conns == expected_num_conns + +for (i, (from,to)) in enumerate(combinations) + remotecall_wait(topid->remotecall_fetch(myid, topid), from, to) + expected_num_conns += 2 # one connection endpoint on both from and to + num_conns = sum(asyncmap(p->remotecall_fetch(count_connected_workers,p), workers())) + @test num_conns == expected_num_conns +end + +# With lazy=false, all connections ought to be setup during `addprocs` +rmprocs(workers()) +addprocs_with_testenv(8; lazy=false) +def_count_conn() +@test sum(asyncmap(p->remotecall_fetch(count_connected_workers,p), workers())) == 64 + +# Cannot add more workers with a different `lazy` value +@test_throws ArgumentError addprocs_with_testenv(1; lazy=true)