Skip to content

Commit

Permalink
Refinements per @amitmurthy comments:
Browse files Browse the repository at this point in the history
  • Loading branch information
samoconnor committed Feb 14, 2016
1 parent 625ca03 commit 693e790
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 41 deletions.
1 change: 0 additions & 1 deletion base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1214,7 +1214,6 @@ export
fetch,
init_worker,
interrupt,
isbusy,
isready,
launch,
manage,
Expand Down
67 changes: 37 additions & 30 deletions base/workerpool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,49 +6,52 @@ type WorkerPool
end


isbusy(pool::WorkerPool) = !isready(pool.channel)


"""
WorkerPool(workers)
WorkerPool([workers])
Create a WorkerPool from a vector of worker ids.
"""
function WorkerPool(workers::Vector{Int})
function WorkerPool(workers::Vector{Int}=Int[])

# Create a shared queue of workers...
pool = RemoteChannel(()->Channel{Int}(128))
pool = WorkerPool(RemoteChannel(()->Channel{Int}(128)))

# Check that workers are not already part of a pool...
check = () -> if :_worker_pool in names(Main)
# Add workers to the pool...
append!(pool, workers)

return pool
end


isready(pool::WorkerPool) = isready(pool.channel)


# WorkerPool that this worker is part of.
_worker_pool = nothing


function assert_not_in_workerpool()
global _worker_pool
if _worker_pool != nothing
error("Worker $(myid()) already in a WorkerPool!")
end
foreach(fetch, [@spawnat w check() for w in workers])
end


function append!(pool::WorkerPool, workers::Vector{Int})

# Check that workers are not already part of a pool...
foreach(fetch, [@spawnat w assert_not_in_workerpool() for w in workers])

# Put each worker into the pool...
for w in workers
put!(pool, w)
put!(pool.channel, w)
@spawnat w global _worker_pool = pool
end

WorkerPool(pool)
end


"""
WorkerPool(n)
Launches `n` workers using the in-built LocalManager.
"""
WorkerPool(n::Integer) = WorkerPool(addprocs(n))


"""
WorkerPool()
Equivalent to WorkerPool(CPU_CORES)
"""
WorkerPool() = WorkerPool(addprocs())
push!(pool::WorkerPool, worker::Int...) = append!(pool, [worker...])


"""
Expand All @@ -57,21 +60,25 @@ WorkerPool() = WorkerPool(addprocs())
Call `f(args...)` on one of the workers in `pool`.
"""
function remotecall_fetch(f, pool::WorkerPool, args...)
l = (args...)->try f(args...) finally put!(_worker_pool, myid()) end
remotecall_fetch(l, take!(pool.channel), args...)
worker = take!(pool.channel)
try
remotecall_fetch(f, worker, args...)
finally
put!(pool.channel, worker)
end
end


"""
default_worker_pool()
default_worker_pool
Built-in WorkerPool (used by `remote(f)`).
"""
global _default_worker_pool = nothing
function default_worker_pool()
global _default_worker_pool
if _default_worker_pool == nothing
_default_worker_pool = WorkerPool(workers())
_default_worker_pool = WorkerPool()
end
return _default_worker_pool
end
Expand Down
22 changes: 12 additions & 10 deletions test/parallel_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ end
addprocs(3; exeflags=`$cov_flag $inline_flag --check-bounds=yes --depwarn=error`)

# Test remote()
let

append!(Base.default_worker_pool(), workers())

let
pool = Base.default_worker_pool()

r = nothing
Expand All @@ -32,23 +34,23 @@ let

t1 = now()

@test isbusy(pool) == false
@test isready(pool) == true
@test count == 0
remote_sleep(10)
@test count == 1
# w1: 10s

@test isbusy(pool) == false
@test isready(pool) == true
remote_sleep(2)
@test count == 2
# w1: 10, w2: 2s

@test isbusy(pool) == false
@test isready(pool) == true
remote_sleep(4)
@test count == 3
@test isbusy(pool) == true
@test isready(pool) == false
remote_sleep(4)
@test isbusy(pool) == true
@test isready(pool) == false
# w1: 10s, w2: 2s, w3: 4s

t2 = now()
Expand All @@ -59,22 +61,22 @@ let
# w1: 9s, w2: 1s, w3: 3s ...
# w1: 8s, w2: !!, w3: 2s
@test count == 3
@test isbusy(pool) == true
@test isready(pool) == false
# w1: 8s, w2: 4s, w3: 2s ...
# w1: 7s, w2: 3s, w3: 1s ...
# w1: 6s, w2: 2s, w3: !! ...
sleep(2)
@test count == 2
@test isbusy(pool) == false
@test isready(pool) == true
sleep(3) # w1: 6s, w2: 2s ...
@test count == 1
@test isbusy(pool) == false
@test isready(pool) == true
# w1: 5s, w2: 1s ...
# w1: 4s, w2: !!
# w1: 3s ...
sleep(4) # w1: 2s ...
@test count == 0
@test isbusy(pool) == false
@test isready(pool) == true
# w1: 1s ...
# w1: !!
# ...
Expand Down

2 comments on commit 693e790

@amitmurthy
Copy link

@amitmurthy amitmurthy commented on 693e790 Feb 16, 2016 via email

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samoconnor
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bump :)

Please sign in to comment.