diff --git a/base/exports.jl b/base/exports.jl index 61577108c702a6..76beda550154a8 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -1210,8 +1210,11 @@ export # multiprocessing addprocs, + amap, + amap!, ClusterManager, fetch, + imap, init_worker, interrupt, isready, diff --git a/base/mapiterator.jl b/base/mapiterator.jl new file mode 100644 index 00000000000000..4ab2a29f41f62b --- /dev/null +++ b/base/mapiterator.jl @@ -0,0 +1,249 @@ +# This file is a part of Julia. License is MIT: http://julialang.org/license + +#------------------------------------------------------------------------------- +# MapItr +# +# Simple synchronous map iterator. +# +# collect(MapItr(f, c...)) == map(f, c...) +# +#------------------------------------------------------------------------------- + +import Base: start, done, next + +type MapItr + f::Union{Function,Type} + arg_itr +end + +MapItr(f, c...) = MapItr(f, zip(c...)) + + +start(itr::MapItr) = start(itr.arg_itr) + +done(itr::MapItr, state) = done(itr.arg_itr, state) + +function next(itr::MapItr, state) + args, state = next(itr.arg_itr, state) + itr.f(args...), state +end + + + +#------------------------------------------------------------------------------- +# AsyncMapItr +# +# Asynchronous map iterator. +# Creates a @sync block (between start() and done(). +# Executes mapped function calls using @asyc. +# Function results are asynchronously stored in "results" collection +# (iterator returns nothing). +# +# for task in AsyncMapItr(f, results, c...) end +# +# == +# +# map!(f, results, c...) +# +# The maximum number of concurrent @async tasks can be set with the +# "ntasks" option. e.g. +# +# AsyncMapItr(f, results, c...; ntasks=10) +# +#------------------------------------------------------------------------------- + + +type AsyncMapItr + f + results + arg_enum::Enumerate + ntasks::Int +end + +function AsyncMapItr(f, results, c...; ntasks=nothing) + if ntasks == nothing + ntasks = 100 + end + AsyncMapItr(f, results, enumerate(zip(c...)), ntasks) +end + + +type AsyncMapState + enum_state + active_count::Int + task_done::Condition + done::Bool +end + + +# Busy if the maximum number of concurrent tasks is running. +function isbusy(itr::AsyncMapItr, state::AsyncMapState) + state.active_count == itr.ntasks +end + + +# Wait for @async task to end. +wait(state::AsyncMapState) = wait(state.task_done) + + +# Open a @sync block and initialise iterator state. +function start(itr::AsyncMapItr) + Base.sync_begin() + AsyncMapState(start(itr.arg_enum), 0, Condition(), false) +end + +# Close @sync block when iterator is done. +function done(itr::AsyncMapItr, state::AsyncMapState) + if !state.done && done(itr.arg_enum, state.enum_state) + state.done = true + Base.sync_end() + end + return state.done +end + +function next(itr::AsyncMapItr, state::AsyncMapState) + + # Wait if the maximum number of concurrent tasks are already running... + while isbusy(itr, state) + wait(state) + end + + # Get index and mapped function arguments from enumeration iterator... + (i, args), state.enum_state = next(itr.arg_enum, state.enum_state) + + # Execute function call and save result asynchronously... + @async begin + itr.results[i] = itr.f(args...) + state.active_count -= 1 + notify(state.task_done, nothing) + end + + # Count number of concurrent tasks... + state.active_count += 1 + + return (nothing, state) +end + + + +#------------------------------------------------------------------------------- +# StreamMapItr +# +# Streaming map iterator. +# Applies mapped function asynchronously and returns results as they become +# available. +# +# collect(StreamMapItr(f, c...)) == map(f, c...) +# +# The maximum number of concurrent @async tasks can be set with the +# "ntasks" option. e.g. +# +# StreamMapItr(f, c...; ntasks=10) +# +#------------------------------------------------------------------------------- + + +type StreamMapItr + async_itr::AsyncMapItr +end + +function StreamMapItr(f, c...; ntasks=nothing) + StreamMapItr(AsyncMapItr(f, Dict{Int,Any}(), c...; ntasks=ntasks)) +end + + +type StreamMapState + i::Int + async_state::AsyncMapState +end + + +start(itr::StreamMapItr) = StreamMapState(0, start(itr.async_itr)) + +# Done when source async iterator is done and all results have been consumed. +function done(itr::StreamMapItr, state::StreamMapState) + done(itr.async_itr, state.async_state) && isempty(itr.async_itr.results) +end + + +# Pump the source async iterator if it is not already busy... + +function pump_source(itr::StreamMapItr, state::StreamMapState) + if !isbusy(itr.async_itr, state.async_state) && + !done(itr.async_itr, state.async_state) + ignored, state.async_state = next(itr.async_itr, state.async_state) + return true + else + return false + end +end + +function next(itr::StreamMapItr, state::StreamMapState) + + state.i += 1 + + results = itr.async_itr.results + while !haskey(results, state.i) + + # Wait for results to become available... + if !pump_source(itr,state) && !haskey(results, state.i) + wait(state.async_state) + end + end + r = results[state.i] + delete!(results, state.i) + + return (r, state) +end + + + +#------------------------------------------------------------------------------- +# Interface: amap and imap +#------------------------------------------------------------------------------- + +""" + amap(f, c...; ntasks=100) -> collection + +Transform collection c by applying f to each element using at most +100 asynchronous tasks. For multiple collection arguments, apply f +elementwise. +Note: `amap(f, c...; ntasks=1)` is equivalent to `map(f, c...)`. +""" +amap(f, c...; kv...) = collect(imap(f, c...; kv...)) + + +""" + imap(f, c...; ntasks=100) -> iterator + +Apply f to each element of c using at most 100 asynchronous tasks. +For multiple collection arguments, apply f elementwise. +Note: `collect(imap(f, c...; ntasks=1))` is equivalent to `map(f, c...)`. +""" +imap(f, c...; ntasks=nothing) = StreamMapItr(f, c...; ntasks=ntasks) + + +""" + amap!(function, collection; ntasks=100) + +In-place version of [`amap`](:func:`amap`). +""" +function amap! end + +""" + amap!(function, destination, collection...; ntasks=100) + +Like [`amap`](:func:`amap`), but stores the result in `destination` rather than a new collection. +`destination` must be at least as large as the first collection. +""" +function amap!(f, c...; ntasks=nothing) + + destination = c[1] + if length(c) > 1 + c = c[2:end] + end + + for task in AsyncMapItr(f, destination, c..., ntasks=ntasks) end + + return destination +end diff --git a/base/sysimg.jl b/base/sysimg.jl index a26eecb7c23fdb..8e0c4f1a50d96d 100644 --- a/base/sysimg.jl +++ b/base/sysimg.jl @@ -227,6 +227,7 @@ importall .Serializer include("channels.jl") include("multi.jl") include("managers.jl") +include("mapiterator.jl") # code loading include("loading.jl") diff --git a/doc/stdlib/parallel.rst b/doc/stdlib/parallel.rst index 2174c42b99bca9..37d2514b237a2b 100644 --- a/doc/stdlib/parallel.rst +++ b/doc/stdlib/parallel.rst @@ -253,6 +253,30 @@ General Parallel Computing Support Get the id of the current process. +.. function:: imap(f, c...; ntasks=100) -> iterator + + .. Docstring generated from Julia source + + Apply f to each element of c using at most 100 asynchronous tasks. For multiple collection arguments, apply f elementwise. Note: ``collect(imap(f, c...; ntasks=1))`` is equivalent to ``map(f, c...)``\ . + +.. function:: amap(f, c...; ntasks=100) -> collection + + .. Docstring generated from Julia source + + Transform collection c by applying f to each element using at most 100 asynchronous tasks. For multiple collection arguments, apply f elementwise. Note: ``amap(f, c...; ntasks=1)`` is equivalent to ``map(f, c...)``\ . + +.. function:: amap!(function, collection; ntasks=100) + + .. Docstring generated from Julia source + + In-place version of :func:`amap`\ . + +.. function:: amap!(function, destination, collection...; ntasks=100) + + .. Docstring generated from Julia source + + Like :func:`amap`\ , but stores the result in ``destination`` rather than a new collection. ``destination`` must be at least as large as the first collection. + .. function:: pmap(f, lsts...; err_retry=true, err_stop=false, pids=workers()) .. Docstring generated from Julia source diff --git a/test/abstractarray.jl b/test/abstractarray.jl index 363ad822840d1d..b516fc8954f383 100644 --- a/test/abstractarray.jl +++ b/test/abstractarray.jl @@ -405,6 +405,7 @@ function test_map(::Type{TestAbstractArray}) f(x) = x + 1 I = GenericIterator{10}() @test map(f, I) == Any[2:11...] + @test amap(f, I) == Any[2:11...] # AbstractArray map for 2 arg case f(x, y) = x + y @@ -413,15 +414,19 @@ function test_map(::Type{TestAbstractArray}) C = Float64[1:10...] @test Base.map_to!(f, 1, A, B, C) == Real[ 2 * i for i in 1:10 ] @test map(f, Int[], Float64[]) == Float64[] + @test amap(f, Int[], Float64[]) == Float64[] # AbstractArray map for N-arg case f(x, y, z) = x + y + z D = Float64[1:10...] @test map!(f, A, B, C, D) == Int[ 3 * i for i in 1:10 ] + @test amap!(f, A, B, C, D) == Int[ 3 * i for i in 1:10 ] @test Base.map_to_n!(f, 1, A, (B, C, D)) == Real[ 3 * i for i in 1:10 ] @test map(f, B, C, D) == Float64[ 3 * i for i in 1:10 ] + @test amap(f, B, C, D) == Float64[ 3 * i for i in 1:10 ] @test map(f, Int[], Int[], Complex{Int}[]) == Number[] + @test amap(f, Int[], Int[], Complex{Int}[]) == Number[] end function test_map_promote(::Type{TestAbstractArray})