Skip to content

Commit

Permalink
Replace Nullable{T} with Union{T, Void} or Union{Some{T}, Void} (#23642)
Browse files Browse the repository at this point in the history
Also add coalesce() function to return first non-nothing value and unwrap Some objects. Use the notnothing() function internally where it makes sense to assert that the result is different from nothing.

Use custom MaybeValue wrapper for ProductIterator to work around a performance regression due to type instability (information about whether a value is present or not is carried separately).
  • Loading branch information
nalimilan authored Dec 15, 2017
1 parent 70900b7 commit 03e0da4
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 125 deletions.
2 changes: 1 addition & 1 deletion src/Distributed.jl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes, wait_connecte
VERSION_STRING, sync_begin, sync_add, sync_end, async_run_thunk,
binding_module, notify_error, atexit, julia_exename, julia_cmd,
AsyncGenerator, acquire, release, invokelatest,
shell_escape_posixly, uv_error
shell_escape_posixly, uv_error, coalesce, notnothing
using Base.Unicode: isascii, isdigit, isnumeric

# NOTE: clusterserialize.jl imports additional symbols from Base.Serializer for use
Expand Down
119 changes: 53 additions & 66 deletions src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,47 @@ abstract type ClusterManager end

mutable struct WorkerConfig
# Common fields relevant to all cluster managers
io::Nullable{IO}
host::Nullable{AbstractString}
port::Nullable{Integer}
io::Union{IO, Void}
host::Union{AbstractString, Void}
port::Union{Integer, Void}

# Used when launching additional workers at a host
count::Nullable{Union{Int, Symbol}}
exename::Nullable{Union{AbstractString, Cmd}}
exeflags::Nullable{Cmd}
count::Union{Int, Symbol, Void}
exename::Union{AbstractString, Cmd, Void}
exeflags::Union{Cmd, Void}

# External cluster managers can use this to store information at a per-worker level
# Can be a dict if multiple fields need to be stored.
userdata::Nullable{Any}
userdata::Any

# SSHManager / SSH tunnel connections to workers
tunnel::Nullable{Bool}
bind_addr::Nullable{AbstractString}
sshflags::Nullable{Cmd}
max_parallel::Nullable{Integer}
tunnel::Union{Bool, Void}
bind_addr::Union{AbstractString, Void}
sshflags::Union{Cmd, Void}
max_parallel::Union{Integer, Void}

# Used by Local/SSH managers
connect_at::Nullable{Any}
connect_at::Any

process::Nullable{Process}
ospid::Nullable{Integer}
process::Union{Process, Void}
ospid::Union{Integer, Void}

# Private dictionary used to store temporary information by Local/SSH managers.
environ::Nullable{Dict}
environ::Union{Dict, Void}

# Connections to be setup depending on the network topology requested
ident::Nullable{Any} # Worker as identified by the Cluster Manager.
ident::Any # Worker as identified by the Cluster Manager.
# List of other worker idents this worker must connect with. Used with topology T_CUSTOM.
connect_idents::Nullable{Array}
connect_idents::Union{Array, Void}

# Run multithreaded blas on worker
enable_threaded_blas::Nullable{Bool}
enable_threaded_blas::Union{Bool, Void}

function WorkerConfig()
wc = new()
for n in 1:length(WorkerConfig.types)
T = eltype(fieldtype(WorkerConfig, n))
setfield!(wc, n, Nullable{T}())
setfield!(wc, n, nothing)
end
wc
end
Expand All @@ -59,18 +59,19 @@ mutable struct Worker
state::WorkerState
c_state::Condition # wait for state changes
ct_time::Float64 # creation time
conn_func::Nullable{Function} # Used to setup connections lazily
conn_func::Any # used to setup connections lazily

r_stream::IO
w_stream::IO
w_serializer::ClusterSerializer # writes can happen from any task hence store the
# serializer as part of the Worker object
manager::ClusterManager
config::WorkerConfig
version::Nullable{VersionNumber} # Julia version of the remote process
version::Union{VersionNumber, Void} # Julia version of the remote process

function Worker(id::Int, r_stream::IO, w_stream::IO, manager::ClusterManager;
version=Nullable{VersionNumber}(), config=WorkerConfig())
version::Union{VersionNumber, Void}=nothing,
config::WorkerConfig=WorkerConfig())
w = Worker(id)
w.r_stream = r_stream
w.w_stream = buffer_writes(w_stream)
Expand All @@ -83,7 +84,7 @@ mutable struct Worker
w
end

Worker(id::Int) = Worker(id, Nullable{Function}())
Worker(id::Int) = Worker(id, nothing)
function Worker(id::Int, conn_func)
@assert id > 0
if haskey(map_pid_wrkr, id)
Expand Down Expand Up @@ -127,13 +128,10 @@ end

exec_conn_func(id::Int) = exec_conn_func(worker_from_id(id))
function exec_conn_func(w::Worker)
if isnull(w.conn_func)
return wait_for_conn(w) # Some other task may be trying to connect at the same time.
end

try
f = get(w.conn_func)
w.conn_func = Nullable{Function}()
f = notnothing(w.conn_func)
# Will be called if some other task tries to connect at the same time.
w.conn_func = () -> wait_for_conn(w)
f()
catch e
w.conn_func = () -> throw(e)
Expand Down Expand Up @@ -174,16 +172,12 @@ worker_timeout() = parse(Float64, get(ENV, "JULIA_WORKER_TIMEOUT", "60.0"))

## worker creation and setup ##
"""
start_worker(out::IO=STDOUT)
start_worker(cookie::AbstractString)
start_worker(out::IO, cookie::AbstractString)
start_worker([out::IO=STDOUT], cookie::AbstractString=readline(STDIN))
`start_worker` is an internal function which is the default entry point for
worker processes connecting via TCP/IP. It sets up the process as a Julia cluster
worker.
If the cookie is unspecified, the worker tries to read it from its STDIN.
host:port information is written to stream `out` (defaults to STDOUT).
The function closes STDIN (after reading the cookie if required), redirects STDERR to STDOUT,
Expand All @@ -192,15 +186,8 @@ line option) and schedules tasks to process incoming TCP connections and request
It does not return.
"""
start_worker(out::IO=STDOUT) = start_worker(out, Nullable{AbstractString}())
start_worker(cookie::AbstractString) = start_worker(STDOUT, Nullable{AbstractString}(cookie))
start_worker(out::IO, cookie::AbstractString) = start_worker(out, Nullable{AbstractString}(cookie))
function start_worker(out::IO, cookie_in::Nullable{AbstractString})
if isnull(cookie_in)
cookie = readline(STDIN)
else
cookie = get(cookie_in)
end
start_worker(cookie::AbstractString=readline(STDIN)) = start_worker(STDOUT, cookie)
function start_worker(out::IO, cookie::AbstractString=readline(STDIN))
close(STDIN) # workers will not use it
redirect_stderr(STDOUT)

Expand Down Expand Up @@ -388,8 +375,8 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
params[:lazy] = false
end

if isnull(PGRP.lazy) || nprocs() == 1
PGRP.lazy = Nullable{Bool}(params[:lazy])
if PGRP.lazy === nothing || nprocs() == 1
PGRP.lazy = params[:lazy]
elseif isclusterlazy() != params[:lazy]
throw(ArgumentError(string("Active workers with lazy=", isclusterlazy(),
". Cannot set lazy=", params[:lazy])))
Expand Down Expand Up @@ -462,9 +449,9 @@ function setup_launched_worker(manager, wconfig, launched_q)
# When starting workers on remote multi-core hosts, `launch` can (optionally) start only one
# process on the remote machine, with a request to start additional workers of the
# same type. This is done by setting an appropriate value to `WorkerConfig.cnt`.
cnt = get(wconfig.count, 1)
cnt = coalesce(wconfig.count, 1)
if cnt === :auto
cnt = get(wconfig.environ)[:cpu_cores]
cnt = wconfig.environ[:cpu_cores]
end
cnt = cnt - 1 # Removing self from the requested number

Expand All @@ -476,8 +463,8 @@ end

function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launched_q)
@sync begin
exename = get(fromconfig.exename)
exeflags = get(fromconfig.exeflags, ``)
exename = notnothing(fromconfig.exename)
exeflags = coalesce(fromconfig.exeflags, ``)
cmd = `$exename $exeflags`

new_addresses = remotecall_fetch(launch_additional, frompid, cnt, cmd)
Expand Down Expand Up @@ -561,10 +548,12 @@ function create_worker(manager, wconfig)

elseif PGRP.topology == :custom
# wait for requested workers to be up before connecting to them.
filterfunc(x) = (x.id != 1) && isdefined(x, :config) && (get(x.config.ident) in get(wconfig.connect_idents, []))
filterfunc(x) = (x.id != 1) && isdefined(x, :config) &&
(notnothing(x.config.ident) in coalesce(wconfig.connect_idents, []))

wlist = filter(filterfunc, PGRP.workers)
while length(wlist) < length(get(wconfig.connect_idents, []))
while wconfig.connect_idents !== nothing &&
length(wlist) < length(wconfig.connect_idents)
sleep(1.0)
wlist = filter(filterfunc, PGRP.workers)
end
Expand All @@ -575,9 +564,13 @@ function create_worker(manager, wconfig)
end
end

all_locs = map(x -> isa(x, Worker) ? (get(x.config.connect_at, ()), x.id) : ((), x.id, true), join_list)
all_locs = map(x -> isa(x, Worker) ?
(coalesce(x.config.connect_at, ()), x.id) :
((), x.id, true),
join_list)
send_connection_hdr(w, true)
join_message = JoinPGRPMsg(w.id, all_locs, PGRP.topology, get(wconfig.enable_threaded_blas, false), isclusterlazy())
enable_threaded_blas = coalesce(wconfig.enable_threaded_blas, false)
join_message = JoinPGRPMsg(w.id, all_locs, PGRP.topology, enable_threaded_blas, isclusterlazy())
send_msg_now(w, MsgHeader(RRID(0,0), ntfy_oid), join_message)

@schedule manage(w.manager, w.id, w.config, :register)
Expand Down Expand Up @@ -679,9 +672,9 @@ mutable struct ProcessGroup
workers::Array{Any,1}
refs::Dict # global references
topology::Symbol
lazy::Nullable{Bool}
lazy::Union{Bool, Void}

ProcessGroup(w::Array{Any,1}) = new("pg-default", w, Dict(), :all_to_all, Nullable{Bool}())
ProcessGroup(w::Array{Any,1}) = new("pg-default", w, Dict(), :all_to_all, nothing)
end
const PGRP = ProcessGroup([])

Expand All @@ -695,23 +688,17 @@ function topology(t)
t
end

function isclusterlazy()
if isnull(PGRP.lazy)
return false
else
return get(PGRP.lazy)
end
end
isclusterlazy() = coalesce(PGRP.lazy, false)

get_bind_addr(pid::Integer) = get_bind_addr(worker_from_id(pid))
get_bind_addr(w::LocalProcess) = LPROC.bind_addr
function get_bind_addr(w::Worker)
if isnull(w.config.bind_addr)
if w.config.bind_addr === nothing
if w.id != myid()
w.config.bind_addr = remotecall_fetch(get_bind_addr, w.id, w.id)
end
end
get(w.config.bind_addr)
w.config.bind_addr
end

# globals
Expand Down Expand Up @@ -1063,8 +1050,8 @@ function check_same_host(pids)
if all(p -> (p==1) || (isa(map_pid_wrkr[p].manager, LocalManager)), pids)
return true
else
first_bind_addr = get(map_pid_wrkr[pids[1]].config.bind_addr)
return all(p -> (p != 1) && (get(map_pid_wrkr[p].config.bind_addr) == first_bind_addr), pids[2:end])
first_bind_addr = notnothing(map_pid_wrkr[pids[1]].config.bind_addr)
return all(p -> (p != 1) && (notnothing(map_pid_wrkr[p].config.bind_addr) == first_bind_addr), pids[2:end])
end
end
end
Expand Down
6 changes: 3 additions & 3 deletions src/macros.jl
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ returning a [`Future`](@ref) to the result.
julia> addprocs(3);
julia> f = @spawn myid()
Future(2, 1, 5, Nullable{Any}())
Future(2, 1, 5, nothing)
julia> fetch(f)
2
julia> f = @spawn myid()
Future(3, 1, 7, Nullable{Any}())
Future(3, 1, 7, nothing)
julia> fetch(f)
3
Expand All @@ -56,7 +56,7 @@ Accepts two arguments, `p` and an expression.
julia> addprocs(1);
julia> f = @spawnat 2 myid()
Future(2, 1, 3, Nullable{Any}())
Future(2, 1, 3, nothing)
julia> fetch(f)
2
Expand Down
36 changes: 18 additions & 18 deletions src/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,10 @@ end

function manage(manager::SSHManager, id::Integer, config::WorkerConfig, op::Symbol)
if op == :interrupt
ospid = get(config.ospid, 0)
if ospid > 0
host = get(config.host)
sshflags = get(config.sshflags)
ospid = config.ospid
if ospid !== nothing
host = notnothing(config.host)
sshflags = notnothing(config.sshflags)
if !success(`ssh -T -a -x -o ClearAllForwardings=yes -n $sshflags $host "kill -2 $ospid"`)
@error "Error sending a Ctrl-C to julia worker $id on $host"
end
Expand Down Expand Up @@ -341,7 +341,7 @@ end

function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
if op == :interrupt
kill(get(config.process), 2)
kill(config.process, 2)
end
end

Expand Down Expand Up @@ -387,24 +387,24 @@ ensure that messages are delivered and received completely and in order.
workers.
"""
function connect(manager::ClusterManager, pid::Int, config::WorkerConfig)
if !isnull(config.connect_at)
if config.connect_at !== nothing
# this is a worker-to-worker setup call.
return connect_w2w(pid, config)
end

# master connecting to workers
if !isnull(config.io)
(bind_addr, port) = read_worker_host_port(get(config.io))
pubhost=get(config.host, bind_addr)
if config.io !== nothing
(bind_addr, port) = read_worker_host_port(config.io)
pubhost = coalesce(config.host, bind_addr)
config.host = pubhost
config.port = port
else
pubhost=get(config.host)
port=get(config.port)
bind_addr=get(config.bind_addr, pubhost)
pubhost = notnothing(config.host)
port = notnothing(config.port)
bind_addr = coalesce(config.bind_addr, pubhost)
end

tunnel = get(config.tunnel, false)
tunnel = coalesce(config.tunnel, false)

s = split(pubhost,'@')
user = ""
Expand All @@ -422,11 +422,11 @@ function connect(manager::ClusterManager, pid::Int, config::WorkerConfig)

if tunnel
if !haskey(tunnel_hosts_map, pubhost)
tunnel_hosts_map[pubhost] = Semaphore(get(config.max_parallel, typemax(Int)))
tunnel_hosts_map[pubhost] = Semaphore(coalesce(config.max_parallel, typemax(Int)))
end
sem = tunnel_hosts_map[pubhost]

sshflags = get(config.sshflags)
sshflags = notnothing(config.sshflags)
acquire(sem)
try
(s, bind_addr) = connect_to_worker(pubhost, bind_addr, port, user, sshflags)
Expand All @@ -442,17 +442,17 @@ function connect(manager::ClusterManager, pid::Int, config::WorkerConfig)
# write out a subset of the connect_at required for further worker-worker connection setups
config.connect_at = (bind_addr, port)

if !isnull(config.io)
if config.io !== nothing
let pid = pid
redirect_worker_output(pid, get(config.io))
redirect_worker_output(pid, notnothing(config.io))
end
end

(s, s)
end

function connect_w2w(pid::Int, config::WorkerConfig)
(rhost, rport) = get(config.connect_at)
(rhost, rport) = notnothing(config.connect_at)
config.host = rhost
config.port = rport
(s, bind_addr) = connect_to_worker(rhost, rport)
Expand Down
Loading

0 comments on commit 03e0da4

Please sign in to comment.