Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python-style imap() #4601

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,7 @@ export
nprocs,
nworkers,
pmap,
ipmap,
procs,
put,
remotecall,
Expand Down
78 changes: 78 additions & 0 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
## pmap(func, lst) -
## call a function on each element of lst (some 1-d thing), in
## parallel.
## ipmap - same as pmap, but does not block the command line, and returns
## an iterable collection
##
## RemoteRef() - create an uninitialized RemoteRef on the local processor
##
Expand Down Expand Up @@ -1407,6 +1409,82 @@ function pmap(f, lsts...; err_retry=true, err_stop=false)
[results[x] for x in 1:nextidx]
end

type ipmap
results::Dict{Int, RemoteRef}
n::Int
retryqueue
end

start(x::ipmap) = 1
done(x::ipmap, state::Int) = (state <= x.n) ? false : true
function next(x::ipmap, state::Int)
while !haskey(x.results, state)
for failure in x.retryqueue
x.results[failure[1]] = failure[3]
end
yield()
end
fetch(x.results[state]), state+1
end

function ipmap(f::Function, lsts...; err_retry=true, err_stop=false)
len = length(lsts)
ret_val = ipmap(Dict{Int, RemoteRef}(), length(lsts[1]), {})

task_in_err = false
is_task_in_error() = task_in_err
set_task_in_error() = (task_in_err = true)

nextidx = 0
getnextidx() = (nextidx += 1)

states = [start(lsts[idx]) for idx in 1:len]
function getnext_tasklet()
if is_task_in_error() && err_stop
return nothing
elseif all([!done(lsts[idx],states[idx]) for idx in 1:len])
nxts = [next(lsts[idx],states[idx]) for idx in 1:len]
map(idx->states[idx]=nxts[idx][2], 1:len)
nxtvals = [x[1] for x in nxts]
return (getnextidx(), nxtvals)

elseif !isempty(ret_val.retryqueue)
return shift!(ret_val.retryqueue)
else
return nothing
end
end

for wpid in workers()
@async begin
tasklet = getnext_tasklet()
while (tasklet != nothing)
(idx, fvals) = tasklet
try
result = remotecall_wait(wpid, f, fvals...)
if isa(result, Exception)
((wpid == myid()) ? rethrow(result) : throw(result))
else
ret_val.results[idx] = result
end
catch ex
if err_retry
push!(ret_val.retryqueue, (idx,fvals, ex))
else
results[idx] = ex
end
set_task_in_error()
break # remove this worker from accepting any more tasks
end

tasklet = getnext_tasklet()
end
end
end

return ret_val
end

# Statically split range [1,N] into equal sized chunks for np processors
function splitrange(N::Int, np::Int)
each = div(N,np)
Expand Down
3 changes: 3 additions & 0 deletions doc/stdlib/base.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4243,7 +4243,10 @@ Parallel Computing
If ``err_retry`` is true, it retries a failed application of ``f`` on a different worker.
If ``err_stop`` is true, it takes precedence over the value of ``err_retry`` and ``pmap`` stops execution on the first error.

.. function:: ipmap()

Same as pmap(), but does not block the command line, and returns an iterable collection

.. function:: remotecall(id, func, args...)

Call a function asynchronously on the given arguments on the specified processor. Returns a ``RemoteRef``.
Expand Down