Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

map() using @async #14515

Closed
samoconnor opened this issue Dec 31, 2015 · 6 comments
Closed

map() using @async #14515

samoconnor opened this issue Dec 31, 2015 · 6 comments

Comments

@samoconnor
Copy link
Contributor

Below is the amap() function from http://github.com/samoconnor/AWSLambda.jl

It is not trying to be like pmap() (which deals with dispatching jobs to multiple compute nodes).
It is just trying to be like map() but with @async invocation of f().

This is a huge performance win any time f() touches network or disk or IPC or anything that might block.

Should I submit a pull request to add this to multi.jl ? abstractarray.jl ?

function amap(f, l)

    count = length(l)

    results = Array{Any,1}(count)
    fill!(results, nothing)

    @sync begin
        for (i,v) in enumerate(l)
            @async begin
                results[i] = f(v...)
            end
        end
    end

    return results
end

In the following example, amap() opens 100 parallel network connections to execute 100 parallel calls to an AWS Lambda function.

using AWSLambda

f = @lambda aws function foo(a, b)
    ... do computation...
    return result
end

r = amap(f, [(i,i) for i = 1:100]))
@tkelman
Copy link
Contributor

tkelman commented Dec 31, 2015

Since it's <15 lines of code, does it need to be added to Base?

@Ismael-VC
Copy link
Contributor

@tkelman could you please tell me what's the minimum number of LOC that a PR should have in order to be considered into Base?

@tkelman
Copy link
Contributor

tkelman commented Dec 31, 2015

It's not strictly a matter of lines of code, but code should really only be added to base if it's worth making all other code that's written in the language effectively depend on it. Or if it requires changes to the C runtime. That's a pretty high bar and things that are pure julia, or wrapping optional libraries, should be moving out of base over time.

Unless there are other places currently in base that are doing exactly this and could be refactored for brevity if we choose to assign an exported name to async map, I don't think it would be widely used enough to absolutely need to be here (along with tests, documentation, and indefinite maintenance going forward). A small AsyncUtils package might be worth separating out, to potentially prove me wrong here.

@samoconnor
Copy link
Contributor Author

FWIW, I've posted some map() refactoring experiments here: https://github.com/samoconnor/AsyncMap.jl

Related: pmap() bug fix #12943 and followup: #12943 (comment)

@samoconnor
Copy link
Contributor Author

Progress... (See https://github.com/samoconnor/AsyncMap.jl/commit/292e6bb11ee27cbc8f0e1c5d1702d503fa14453b)

imap() (iterator map) is a convenience wrapper for StreamMapItr.

e.g, three stage processing pipeline with input from infinite iterator:

for r in imap(format, imap(analyse, imap(normalise, sqs_messages(queue); max=10)))
    println(r)
end

This will read up to 10 messages from the queue in parallel.
normalise(), analyse() and format() will all run asynchronously to
process the data as it becomes available.

@samoconnor
Copy link
Contributor Author

Closing in favour of #14843

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants