Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WorkerPool and remote() #15073

Merged
merged 1 commit into from
Mar 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1223,6 +1223,7 @@ export
pmap,
procs,
put!,
remote,
remotecall,
remotecall_fetch,
remotecall_wait,
Expand Down
7 changes: 6 additions & 1 deletion base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type JoinPGRPMsg <: AbstractMsg
self_is_local::Bool
notify_oid::Tuple
topology::Symbol
worker_pool
end
type JoinCompleteMsg <: AbstractMsg
notify_oid::Tuple
Expand Down Expand Up @@ -1045,6 +1046,8 @@ function handle_msg(msg::JoinPGRPMsg, r_stream, w_stream)

for wt in wait_tasks; wait(wt); end

set_default_worker_pool(msg.worker_pool)

send_msg_now(controller, JoinCompleteMsg(msg.notify_oid, Sys.CPU_CORES, getpid()))
end

Expand All @@ -1070,6 +1073,8 @@ function handle_msg(msg::JoinCompleteMsg, r_stream, w_stream)

ntfy_channel = lookup_ref(msg.notify_oid)
put!(ntfy_channel, w.id)

put!(default_worker_pool(), w)
end

function disable_threaded_libs()
Expand Down Expand Up @@ -1384,7 +1389,7 @@ function create_worker(manager, wconfig)
end

all_locs = map(x -> isa(x, Worker) ? (get(x.config.connect_at, ()), x.id, isa(x.manager, LocalManager)) : ((), x.id, true), join_list)
send_msg_now(w, JoinPGRPMsg(w.id, all_locs, isa(w.manager, LocalManager), ntfy_oid, PGRP.topology))
send_msg_now(w, JoinPGRPMsg(w.id, all_locs, isa(w.manager, LocalManager), ntfy_oid, PGRP.topology, default_worker_pool()))

@schedule manage(w.manager, w.id, w.config, :register)
wait(rr_ntfy_join)
Expand Down
1 change: 1 addition & 0 deletions base/sysimg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ include("serialize.jl")
importall .Serializer
include("channels.jl")
include("multi.jl")
include("workerpool.jl")
include("managers.jl")

# code loading
Expand Down
75 changes: 75 additions & 0 deletions base/workerpool.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# This file is a part of Julia. License is MIT: http://julialang.org/license


type WorkerPool
channel::RemoteChannel{Channel{Int}}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add a default constructor here to avoid the following build warning :
WARNING: Method definition (::Type{Base.WorkerPool})(Any) in module Base at workerpool.jl:5 overwritten at workerpool.jl:17

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

end


"""
WorkerPool([workers])

Create a WorkerPool from a vector of worker ids.
"""
function WorkerPool(workers=Int[])

# Create a shared queue of available workers...
pool = WorkerPool(RemoteChannel(()->Channel{Int}(128)))

# Add workers to the pool...
for w in workers
put!(pool, w)
end

return pool
end


put!(pool::WorkerPool, w::Int) = put!(pool.channel, w)
put!(pool::WorkerPool, w::Worker) = put!(pool.channel, w.id)

isready(pool::WorkerPool) = isready(pool.channel)

take!(pool::WorkerPool) = take!(pool.channel)


"""
remotecall_fetch(f, pool::WorkerPool, args...)

Call `f(args...)` on one of the workers in `pool`.
"""
function remotecall_fetch(f, pool::WorkerPool, args...)
worker = take!(pool)
try
remotecall_fetch(f, worker, args...)
finally
put!(pool, worker)
end
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The put!(_worker_pool, myid()) will result in a separate message sent to the caller. This can be optimized by doing it on the caller.

function remotecall_fetch(f, pool::WorkerPool, args...)
     pid = take!(pool.channel)
     try
          remotecall_fetch(f, pid, args...)
     finally
          put!(pool.channel, pid) 
     end
 end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice one



"""
default_worker_pool

WorkerPool containing idle `workers()` (used by `remote(f)`).
"""
_default_worker_pool = Nullable{WorkerPool}()
function default_worker_pool()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default_worker_pool should consist of all current workers (across cluster managers) at any time, i.e., workers are added and removed from the pool as and when they come online/removed. It should not be dependent on having to call addprocs first.

It should be the same list as workers() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a first step, I've changed default_worker_pool to be empty by default.
For now, you can do append!(default_worker_pool(), workers()) after calling addprocs() or passing -p on the command line.

A future PR could automatically call push!(default_worker_pool(), w) from addprocs. At this point I haven't got my head around the initialisation ordering issues that might crop up...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

  • doing away with a _worker_pool. Worker pools are created as and when required with a list of workers. WorkerPool() is equivalent to WorkerPool(workers())
  • _default_worker_pool exists on pid 1. A worker is added to it here
    function handle_msg(msg::JoinCompleteMsg, r_stream, w_stream)
  • _default_worker_pool on other workers are initialized via the JoinPGRP message
    type JoinPGRPMsg <: AbstractMsg
  • On other workers the channel referred by it is the same one as on pid 1

if isnull(_default_worker_pool) && myid() == 1
set_default_worker_pool(WorkerPool())
end
return get(_default_worker_pool)
end

function set_default_worker_pool(p::WorkerPool)
global _default_worker_pool = Nullable(p)
end


"""
remote(f)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs to go into the rst docs if it's going to be exported

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about calling it remotef? it becomes clearer that it returns a function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless there is a solid convention that functions that return a function have an f suffix, I'd rather not change it as this point and introduce new conventions on the fly. It has been remote as proposed in #14843 since Jan without objection.

I'd be happy for it to be changed in a later functions-that-return-functions naming cleanup PR.
(There is probably an argument for a functions-that-return-iterators naming convention too...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.rst update done


Returns a lambda that executes function `f` on an available worker
using `remotecall_fetch`.
"""
remote(f) = (args...)->remotecall_fetch(f, default_worker_pool(), args...)
6 changes: 6 additions & 0 deletions doc/stdlib/parallel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,12 @@ General Parallel Computing Support

Perform ``fetch(remotecall(...))`` in one message. Any remote exceptions are captured in a ``RemoteException`` and thrown.

.. function:: remote(f)

.. Docstring generated from Julia source

Returns a lambda that executes function ``f`` on an available worker using ``remotecall_fetch``.

.. function:: put!(RemoteChannel, value)

.. Docstring generated from Julia source
Expand Down
61 changes: 61 additions & 0 deletions test/parallel_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,67 @@ elseif Base.JLOptions().code_coverage == 2
end
addprocs(3; exeflags=`$cov_flag $inline_flag --check-bounds=yes --depwarn=error`)

# Test remote()

let
pool = Base.default_worker_pool()

count = 0
count_condition = Condition()

function remote_wait(c)
@async begin
count += 1
remote(take!)(c)
count -= 1
notify(count_condition)
end
yield()
end

testchannels = [RemoteChannel() for i in 1:nworkers()]
testcount = 0
@test isready(pool) == true
for c in testchannels
@test count == testcount
remote_wait(c)
testcount += 1
end
@test count == testcount
@test isready(pool) == false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be worthwhile to make these and below tests work with any number of workers by re-writing like

testchannels = [RemoteChannel() for i in 1:nworkers()]
testcount = 0
@test isready(pool) == true
for c in testchannels
    @test count == testcount
     remote_wait(c)
     testcount += 1 
end
@test isready(pool) == false

This way if the number of workers changes in the future we do not have to revisit this block of tests. Also more readable.


for c in testchannels
@test count == testcount
put!(c, "foo")
testcount -= 1
wait(count_condition)
@test count == testcount
@test isready(pool) == true
end

@test count == 0

for c in testchannels
@test count == testcount
remote_wait(c)
testcount += 1
end
@test count == testcount
@test isready(pool) == false

for c in reverse(testchannels)
@test count == testcount
put!(c, "foo")
testcount -= 1
wait(count_condition)
@test count == testcount
@test isready(pool) == true
end

@test count == 0
end


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have had tests hitting upper limits and timing out on CI. Would be better to rewrite these tests with a different strategy so that they do not add 10 seconds to the parallel test run.

id_me = myid()
id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))]

Expand Down