-
-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Configure pmap behavior via keyword args #15975
Conversation
It is a WIP, I have just outlined the approach. Please don't review the code in detail yet. Have put it out to start a discussion on the feature/interface. |
the flexibility to specify whether it's distributed or not and the number of tasks on each worker would be great! two questions / suggestions:
my use case is a shared cluster with a high load. if one so in code, something like this:
|
while true | ||
pool.count == 0 && throw(ErrorException("No active worker available in pool")) | ||
worker = take!(pool.channel) | ||
if worker in procs() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can it happen that worker
is not in procs()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Between the time a worker is added back to the pool and it is picked up again, the worker may die, or the host may die, or the network to that particular host may fail. And retry
will just end up retrying to a failed worker and adding it back to the pool again and again.
If the connection to a worker is lost, it is removed from procs()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to remove the worker from default_worker_pool at the same time that it is removed from procs()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we should do that too.
The check here is still required as the worker pool used may not always be the default_worker_pool
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess that makes sense.
Perhaps there should be some kind of worker death notification mechanism or callback so that custom worker pools implementations can register and deal with cleaning up dead workers for themselves.
I think the thing that is making me nervous is having code that detects an inconsistency and cleans it up. It makes it seem like there is a source of inconsistency that is not fully understood.
A few questions... What are the pros and cons of the alternate interfaces ?
results = pmap(p -> remotecall_fetch(foo, p, args...), workers(); distributed=false) results = asyncmap(p -> remotecall_fetch(foo, p, args...), workers())
results = pmap(f, c, on_error = ex -> ex) results = pmap(x -> try f(x) catch ex end, c) If In what
results = pmap(f, c, batch_size = 1) results = asyncmap(remote(f), c) |
Would prefer |
|
If the computation time is much higher than the communication time, a |
Yes, I agree, they should be consistent as much as possible. |
My thinking is along the lines that |
However, if there is a reasonable algorithm for handling network / cluster errors in a remote call then perhaps it should be handled by I'm not necessarily against adding kw args to manipulate this kind of behaviour, but it seems that it would be better to start with a simple set of composeable functions and add the kw args later if needed after there has been time to see some real-world usage. i.e. if there is a verbose pattern that gets repeated a lot. I like the idea that |
I would argue that |
If computation time is much higher than the communication time, won't the overhead of batching be negligible anyway? Perhaps I'm missing a nuance here...
It seems to me that the current behaviour (auto batching) won't be surprising because it should be completely invisible to the user. The results are always returned in order in a single collection. (If there is a use-case where the auto batching has surprising results, we should adjust the auto-batching algorithm.) The batching behaviour was added based on your suggestion that "having pmap processes chunks of the list at a time will reduce network roundtrips". My feeling is that if you are not comfortable with having batching as a default However, I think the batching is probably a win in many cases and probably has no significant overhead in most others. I would suggest waiting until there is a real-world performance problem before trying to make it user-tunable through kw args. If the auto-tuning can be improved over time, then everyone benefits. If there are a bunch of kw args for fine-tuning then only those users who take the time to tune them will benefit. |
It seems that there is a trade-off decision here between "Do we need two functions for something quite similar?" and "Do we need all these kw options?". What if the remote call functionality was removed from i.e. This seems to achieve both goals: only one function; and not needing the "distributed" kw option. The documentation could tell a story something like this: "You can use |
The majority of users would use that API incorrectly and be surprised when |
Keyword args make it easier to customize program behavior via config files / environment variables. We could have both. Expose the composable functions as well as kw args. While batching is efficient when the ratio of jobs to workers is high, for example, a 100 jobs over 10 workers, consider the case where 10 jobs are split into 5 batches over 5 workers and each job takes a varied amount of processing time. Say the the fastest job is 10 times as slower as the fastest. Now, if a worker ends up with the 2 of the slowest jobs as part of a single batch it will just slow down the overall time taken. Which is why, I would prefer the default as batch_size=1, i.e., no batching, API behavior is predictable and only if the user needs a requirement of batching, the same can be specified via |
The current code already handles that and uses batches = batchsplit(c, min_batch_count = length(p) * 3) For 5 workers # If there are not enough batches, use a smaller batch size...
if length(head) < min_batch_count
batch_size = max(1, div(sum(length, head), min_batch_count))
return partition(collect(flatten(head)), batch_size)
end e.g. # 5 workers...
julia> p = 1:5
1:5
# 10 jobs...
julia> c = 1:10
1:10
julia> collect(batchsplit(c, min_batch_count = length(p) * 3))
10-element Array{Array{Int64,1},1}:
[1]
[2]
[3]
[4]
[5]
[6]
[7]
[8]
[9]
[10] In fact, batch size julia> c = 1:29
1:29
julia> collect(batchsplit(c, min_batch_count = length(p) * 3))
29-element Array{Array{Int64,1},1}:
[1]
[2]
[3]
[4]
[5]
[6]
[7]
[8]
[9]
[10]
[11]
[12]
[13]
[14]
[15]
[16]
[17]
[18]
[19]
[20]
[21]
[22]
[23]
[24]
[25]
[26]
[27]
[28]
[29] For 30 jobs, batch size is set to 2: julia> c = 1:30
1:30
julia> collect(batchsplit(c, min_batch_count = length(p) * 3))
15-element Array{Array{Int64,1},1}:
[1,2]
[3,4]
[5,6]
[7,8]
[9,10]
[11,12]
[13,14]
[15,16]
[17,18]
[19,20]
[21,22]
[23,24]
[25,26]
[27,28]
[29,30] ... obviously this could be tuned so that batching only takes effect for larger numbers of jobs. |
BUG in current code: The example above caused: It seems that Jeff's new This patch works around that by doing diff --git a/base/pmap.jl b/base/pmap.jl
index acf48fa..e40192e 100644
--- a/base/pmap.jl
+++ b/base/pmap.jl
@@ -73,7 +73,7 @@ function batchsplit(c; min_batch_count=1, max_batch_size=100)
# If there are not enough batches, use a smaller batch size...
if length(head) < min_batch_count
batch_size = max(1, div(sum(length, head), min_batch_count))
- return partition(flatten(head), batch_size)
+ return partition(collect(flatten(head)), batch_size)
end
return flatten((head, tail)) |
OK. Can set default to |
how about adding another keyword argument to return an iterator? another argument against a batch_size > 1 is when each task consumes all of RAM. that's my use case. i think the main source of tension here in the API is between the use case of asynchronous tasks and the use case of distributed workers. having a single function with keyword args that can flexibly specify any combination of the two serves both needs. lastly, can someone please answer my second question in my first post here about workers being dynamically added to the pool as they become available? thanks. |
|
5359edf
to
c04c717
Compare
Ready for review. I have kept the default batch size at 1 for now as I am not very comfortable with the idea of automatic batching yet. I'll submit a separate PR for automatic removal of failed workers from worker pools. Exposing / exporting sub-functionalities of pmap independently can be in a separate PR too. |
c04c717
to
8a04e81
Compare
:batch_size => 1, | ||
:on_error => nothing) | ||
|
||
const PMAP_KW_NAMES = [:distributed, :batch_size, :on_error] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be less redundant to say ``const PMAP_KW_NAMES = keys(DEFAULT_PMAP_ARGS)`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
collect(keys(DEFAULT_PMAP_ARGS))
then, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
at the very beginning of this pmap refactoring someone made the suggestion to limit the number of tasks to 100 because otherwise problems would arise. i can't find the original discussion or the issue that was linked to then. could someone please remind me? i ask because i'm having problems adding more procs then 100 and think it might be related. thanks. |
The AsyncGenerator constructor accepts a Note:
Can you describe what the problems are? |
|
||
if (distributed == false) || | ||
(length(p) == 0) || | ||
(length(p) == 1 && fetch(p.channel) == myid()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can this happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the pmap is run from a different worker.
Also, we should add the master process to default_worker_pool and remove it once the first worker is added since in the absence of any workers, all remote calls should execute on the master itself. In the current implementation remote
with nprocs() == 1
would wait for a worker to be added. I'll change it.
d3ca9de
to
58785fc
Compare
I am going to merge this after a green CI. Have documented the minimum 50 ms delay for And will open a new issue to discuss changing defaults once this is merged. |
Returns a lambda that retries function ``f`` up to ``n`` times in the event of an exception. If ``condition`` is a ``Type`` then retry only for exceptions of that type. If ``condition`` is a function ``cond(::Exception) -> Bool`` then retry only if it is true. | ||
Returns a lambda that retries function ``f`` up to ``n`` times in the event of an exception. If ``retry_on`` is a ``Type`` then retry only for exceptions of that type. If ``retry_on`` is a function ``test_error(::Exception) -> Bool`` then retry only if it is true. | ||
|
||
The first retry happens after a gap of 50 milliseconds or ``max_delay``\ , whichever is lower. Subsequently, the delays between retries are exponentially increased with a random factor upto ``max_delay``\ . |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems odd to me to describe the exponential back off but have the default max_delay
set so that there is no exponential back-off.
Would you consider leaving max_delay
at 10 in this PR so that it can be discussed further before it is changed.
After all now that default n
is 1
the max_delay
has not effect anyway unless the user changes n
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK.
Hi @amitmurthy, |
Is it an error that the manual entry says the default is |
OK. Will hold off the merge till the weekend. Since |
58785fc
to
7cdfd1a
Compare
@tkelman is the AV error a known issue? |
It's awfully frequent lately but I don't know what's causing it. One of the workers may have frozen? |
is used. | ||
|
||
By default, `pmap` distributes the computation over all specified workers. To use only the | ||
local process and distribute over tasks, specifiy `distributed=false`. This is equivalent to `asyncmap`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
specify
OK. Will go ahead and merge then. |
end | ||
|
||
if retry_n > 0 | ||
f = wrap_retry(f, retry_on, retry_n, retry_max_delay) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just retry(f, retry_on; n=n, max_delay=retry_max_delay)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This results in a stack overflow
f = () -> 1
f = () -> f() + 1
f()
This works
f = ()->1
function wrap(f)
() -> f() + 1
end
f = wrap(f)
f()
I don't know if this is by design. Should open an issue if not.
Different variable names for f
at each step would have removed the need for the wrapping function.
There are a few longish lines of code and doc where the surrounding code was previously 80-cols. |
|
||
|
||
""" | ||
@catch(f) -> Function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no mention of deleting @catch
in the PR description or the commit logs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR is a continuation of the discussion in #15409 . But yes, mentioning it here / commit log would have helped.
Yes, we should refactor this to have |
Perhaps you should just back out the whole batching thing for now and give a simple example of how to use flatten + batchsplit in the doc. |
(Fat phone fingers pressed Comment prematurely) |
We can keep the other iterators simple and move more logic only into |
This PR enables customization of the new pmap implementation via keyword args.
distributed=true
By default
pmap
is a parallel map using all available workers. Withdistributed=false
, the parallelization is limited to using tasks in the current process and can be used in place of@sync
/@async
blocks. For example, we may want to retrieve data from other processes in parallel using a construction likewhich can now be written as
batch_size=1
By default, pmap will not perform any batching. You may specify an explicit integer value for the
batch_size
, or specifybatch_size=:auto
whereinpmap
will create batches depending on the number of workers available.on_error = e->rethrow(e)
By default,
pmap
will stop on any error. Users may override this behavior by supplying anon_error
function. For example,on_error = e -> e
will simply return the exception inline oron_error = e -> SOME_ERROR_CODE
will return an error code value inline in the results.