Skip to content

Commit

Permalink
Distributed: Propagate package environment to local workers.
Browse files Browse the repository at this point in the history
Local workers now inherit the package environment of the main process,
i.e. the active project, LOAD_PATH, and DEPOT_PATH. This behavior
can be overridden by passing the new `env` keyword argument, or by
passing `--project` in the `exeflags` keyword argument.

Fixes #28781, and closes #42089.
  • Loading branch information
fredrikekre committed Feb 24, 2022
1 parent 8ae9dd5 commit 0f17581
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 5 deletions.
7 changes: 7 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ Standard library changes

#### Distributed

* The package environment (active project, `LOAD_PATH`, `DEPOT_PATH`) are now propagated
when adding *local* workers (e.g. with `addprocs(N::Int)` or through the `--procs=N`
command line flag) ([#43270]).
* `addprocs` for local workers now accept the `env` keyword argument for passing
environment variables to the workers processes. This was already supported for
remote workers ([#43270]).

#### UUIDs

#### Mmap
Expand Down
1 change: 1 addition & 0 deletions stdlib/Distributed/src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ default_addprocs_params() = Dict{Symbol,Any}(
:dir => pwd(),
:exename => joinpath(Sys.BINDIR, julia_exename()),
:exeflags => ``,
:env => [],
:enable_threaded_blas => false,
:lazy => true)

Expand Down
39 changes: 34 additions & 5 deletions stdlib/Distributed/src/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ It is possible to launch multiple processes on a remote host by using a tuple in
workers to be launched on the specified host. Passing `:auto` as the worker count will
launch as many workers as the number of CPU threads on the remote host.
Examples:
**Examples**:
```julia
addprocs([
"remote1", # one worker on 'remote1' logging in with the current username
Expand All @@ -76,7 +76,7 @@ addprocs([
])
```
Keyword arguments:
**Keyword arguments**:
* `tunnel`: if `true` then SSH tunneling will be used to connect to the worker from the
master process. Default is `false`.
Expand Down Expand Up @@ -445,10 +445,17 @@ end
Launch `np` workers on the local host using the in-built `LocalManager`.
*Keyword arguments:*
Local workers inherit the current package environment (i.e., active project,
[`LOAD_PATH`](@ref), and [`DEPOT_PATH`](@ref)) from the main process.
**Keyword arguments**:
- `restrict::Bool`: if `true` (default) binding is restricted to `127.0.0.1`.
- `dir`, `exename`, `exeflags`, `topology`, `lazy`, `enable_threaded_blas`: same effect
- `dir`, `exename`, `exeflags`, `env`, `topology`, `lazy`, `enable_threaded_blas`: same effect
as for `SSHManager`, see documentation for [`addprocs(machines::AbstractVector)`](@ref).
!!! compat "Julia 1.9"
The inheriting of the package environment and the `env` keyword argument were
added in Julia 1.9.
"""
function addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...)
manager = LocalManager(np, restrict)
Expand All @@ -463,10 +470,32 @@ function launch(manager::LocalManager, params::Dict, launched::Array, c::Conditi
exename = params[:exename]
exeflags = params[:exeflags]
bind_to = manager.restrict ? `127.0.0.1` : `$(LPROC.bind_addr)`
env = Dict{String,String}(params[:env])

# TODO: Maybe this belongs in base/initdefs.jl as a package_environment() function
# together with load_path() etc. Might be useful to have when spawning julia
# processes outside of Distributed.jl too.
# JULIA_(LOAD|DEPOT)_PATH are used to populate (LOAD|DEPOT)_PATH on startup,
# but since (LOAD|DEPOT)_PATH might have changed they are re-serialized here.
# Users can opt-out of this by passing `env = ...` to addprocs(...).
pathsep = Sys.iswindows() ? ";" : ":"
if get(env, "JULIA_LOAD_PATH", nothing) === nothing
env["JULIA_LOAD_PATH"] = join(LOAD_PATH, pathsep)
end
if get(env, "JULIA_DEPOT_PATH", nothing) === nothing
env["JULIA_DEPOT_PATH"] = join(DEPOT_PATH, pathsep)
end
# Set the active project on workers using JULIA_PROJECT.
# Users can opt-out of this by (i) passing `env = ...` or (ii) passing
# `--project=...` as `exeflags` to addprocs(...).
project = Base.ACTIVE_PROJECT[]
if project !== nothing && get(env, "JULIA_PROJECT", nothing) === nothing
env["JULIA_PROJECT"] = project
end

for i in 1:manager.np
cmd = `$(julia_cmd(exename)) $exeflags --bind-to $bind_to --worker`
io = open(detach(setenv(cmd, dir=dir)), "r+")
io = open(detach(setenv(addenv(cmd, env), dir=dir)), "r+")
write_cookie(io)

wconfig = WorkerConfig()
Expand Down
107 changes: 107 additions & 0 deletions stdlib/Distributed/test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1739,6 +1739,113 @@ for p in procs()
@test @fetchfrom(p, i27429) == 27429
end

# Propagation of package environments for local workers (#28781)
let julia = `$(Base.julia_cmd()) --startup-file=no`; mktempdir() do tmp
project = mkdir(joinpath(tmp, "project"))
depots = [mkdir(joinpath(tmp, "depot1")), mkdir(joinpath(tmp, "depot2"))]
load_path = [mkdir(joinpath(tmp, "load_path")), "@stdlib", "@"]
pathsep = Sys.iswindows() ? ";" : ":"
env = Dict(
"JULIA_DEPOT_PATH" => join(depots, pathsep),
"JULIA_LOAD_PATH" => join(load_path, pathsep),
)
setupcode = """
using Distributed, Test
@everywhere begin
depot_path() = DEPOT_PATH
load_path() = LOAD_PATH
active_project() = Base.ACTIVE_PROJECT[]
end
"""
testcode = setupcode * """
for w in workers()
@test remotecall_fetch(depot_path, w) == DEPOT_PATH
@test remotecall_fetch(load_path, w) == LOAD_PATH
@test remotecall_fetch(Base.load_path, w) == Base.load_path()
@test remotecall_fetch(active_project, w) == Base.ACTIVE_PROJECT[]
@test remotecall_fetch(Base.active_project, w) == Base.active_project()
end
"""
# No active project
extracode = """
for w in workers()
@test remotecall_fetch(active_project, w) === Base.ACTIVE_PROJECT[] === nothing
end
"""
cmd = setenv(`$(julia) -p1 -e $(testcode * extracode)`, env)
@test success(cmd)
# --project
extracode = """
for w in workers()
@test remotecall_fetch(active_project, w) == Base.ACTIVE_PROJECT[] ==
$(repr(project))
end
"""
cmd = setenv(`$(julia) --project=$(project) -p1 -e $(testcode * extracode)`, env)
@test success(cmd)
# JULIA_PROJECT
cmd = setenv(`$(julia) -p1 -e $(testcode * extracode)`,
(env["JULIA_PROJECT"] = project; env))
@test success(cmd)
# Pkg.activate(...)
activateish = """
Base.ACTIVE_PROJECT[] = $(repr(project))
using Distributed
addprocs(1)
"""
cmd = setenv(`$(julia) -e $(activateish * testcode * extracode)`, env)
@test success(cmd)
# JULIA_(LOAD|DEPOT)_PATH
shufflecode = """
d = reverse(DEPOT_PATH)
append!(empty!(DEPOT_PATH), d)
l = reverse(LOAD_PATH)
append!(empty!(LOAD_PATH), l)
"""
addcode = """
using Distributed
addprocs(1) # after shuffling
"""
extracode = """
for w in workers()
@test remotecall_fetch(load_path, w) == $(repr(reverse(load_path)))
@test remotecall_fetch(depot_path, w) == $(repr(reverse(depots)))
end
"""
cmd = setenv(`$(julia) -e $(shufflecode * addcode * testcode * extracode)`, env)
@test success(cmd)
# Mismatch when shuffling after proc addition
failcode = shufflecode * setupcode * """
for w in workers()
@test remotecall_fetch(load_path, w) == reverse(LOAD_PATH) == $(repr(load_path))
@test remotecall_fetch(depot_path, w) == reverse(DEPOT_PATH) == $(repr(depots))
end
"""
cmd = setenv(`$(julia) -p1 -e $(failcode)`, env)
@test success(cmd)
# Passing env or exeflags to addprocs(...) to override defaults
envcode = """
using Distributed
project = mktempdir()
env = Dict(
"JULIA_LOAD_PATH" => LOAD_PATH[1],
"JULIA_DEPOT_PATH" => DEPOT_PATH[1],
)
addprocs(1; env = env, exeflags = `--project=\$(project)`)
env["JULIA_PROJECT"] = project
addprocs(1; env = env)
""" * setupcode * """
for w in workers()
@test remotecall_fetch(depot_path, w) == [DEPOT_PATH[1]]
@test remotecall_fetch(load_path, w) == [LOAD_PATH[1]]
@test remotecall_fetch(active_project, w) == project
@test remotecall_fetch(Base.active_project, w) == joinpath(project, "Project.toml")
end
"""
cmd = setenv(`$(julia) -e $(envcode)`, env)
@test success(cmd)
end end

include("splitrange.jl")

# Run topology tests last after removing all workers, since a given
Expand Down

0 comments on commit 0f17581

Please sign in to comment.