Skip to content

Commit

Permalink
test load balancing with map
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin committed Jul 25, 2018
1 parent b8b985b commit 112bb36
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
6 changes: 4 additions & 2 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1320,8 +1320,10 @@ def map(self, func, *iterables, **kwargs):
user_priority = kwargs.pop('priority', 0)
allow_other_workers = kwargs.pop('allow_other_workers', False)
fifo_timeout = kwargs.pop('fifo_timeout', '100ms')
actor = kwargs.pop('actor', False)
actors = kwargs.pop('actors', False)
pure = kwargs.pop('pure', not actors)
actor = actor or actors
pure = kwargs.pop('pure', not actor)

if allow_other_workers and workers is None:
raise ValueError("Only use allow_other_workers= if using workers=")
Expand Down Expand Up @@ -1380,7 +1382,7 @@ def map(self, func, *iterables, **kwargs):
retries=retries,
user_priority=user_priority,
fifo_timeout=fifo_timeout,
actors=actors)
actors=actor)
logger.debug("map(%s, ...)", funcname(func))

return [futures[tokey(k)] for k in keys]
Expand Down
15 changes: 15 additions & 0 deletions distributed/tests/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,3 +390,18 @@ def __init__(self, x):

assert s.tasks[x.key].who_has == {ws} # first went to best match
assert s.tasks[x.key].who_has != s.tasks[y.key].who_has # second load balanced


@gen_cluster(client=True, ncores=[('127.0.0.1', 1)] * 5)
def test_load_balance_map(c, s, *workers):
class Foo(object):
def __init__(self, x, y=None):
pass

b = c.submit(operator.mul, 'b', 1000000)
yield wait(b)

actors = c.map(Foo, range(10), y=b, actor=True)
yield wait(actors)

assert all(len(w.actors) == 2 for w in workers)

0 comments on commit 112bb36

Please sign in to comment.