Skip to content

Commit

Permalink
Code review
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Sep 1, 2021
1 parent a41f890 commit f277a00
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 23 deletions.
16 changes: 5 additions & 11 deletions distributed/active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ def __init__(
scheduler: SchedulerState,
# The following parameters are exposed so that one may create, run, and throw
# away on the fly a specialized manager, separate from the main one.
policies: set[ActiveMemoryManagerPolicy] | None = None,
policies: Optional[set[ActiveMemoryManagerPolicy]] = None,
register: bool = True,
start: bool | None = None,
interval: float | None = None,
start: Optional[bool] = None,
interval: Optional[float] = None,
):
self.scheduler = scheduler

Expand Down Expand Up @@ -201,10 +201,7 @@ def _find_recipient(
candidates -= pending_repl
if not candidates:
return None
# id(ws) is there just to prevent WorkerState objects to be compared in the
# unlikely event that they have exactly the same amount of bytes allocated.
_, _, ws = min((self.workers_memory[ws], id(ws), ws) for ws in candidates)
return ws
return min(candidates, key=self.workers_memory.get)

def _find_dropper(
self,
Expand All @@ -227,10 +224,7 @@ def _find_dropper(
candidates -= {waiter_ts.processing_on for waiter_ts in ts.waiters}
if not candidates:
return None
# id(ws) is there just to prevent WorkerState objects to be compared in the
# unlikely event that they have exactly the same amount of bytes allocated.
_, _, ws = max((self.workers_memory[ws], id(ws), ws) for ws in candidates)
return ws
return max(candidates, key=self.workers_memory.get)


class ActiveMemoryManagerPolicy:
Expand Down
26 changes: 14 additions & 12 deletions distributed/tests/test_active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async def test_drop(c, s, *workers):
while len(s.tasks["x"].who_has) > 1:
await asyncio.sleep(0.01)
# The last copy is never dropped even if the policy asks so
await asyncio.sleep(0.1)
await asyncio.sleep(0.2)
assert len(s.tasks["x"].who_has) == 1


Expand All @@ -88,14 +88,16 @@ async def test_start_stop(c, s, a, b):
await c.scheduler.amm_stop()
# AMM is not running anymore
await c.replicate(x, 2)
await asyncio.sleep(0.1)
await asyncio.sleep(0.2)
assert len(s.tasks["x"].who_has) == 2


@gen_cluster(client=True, config=demo_config("drop", start=True))
@gen_cluster(client=True, config=demo_config("drop", start=True, interval=0.1))
async def test_auto_start(c, s, a, b):
futures = await c.scatter({"x": 123}, broadcast=True)
await asyncio.sleep(0.3)
# The AMM should run within 0.1s of the broadcast.
# Add generous extra padding to prevent flakiness.
await asyncio.sleep(0.5)
assert len(s.tasks["x"].who_has) == 1


Expand Down Expand Up @@ -138,7 +140,7 @@ async def test_drop_with_waiter(c, s, a, b):
await asyncio.sleep(0.01)

s.extensions["amm"].run_once()
await asyncio.sleep(0.1)
await asyncio.sleep(0.2)
assert {ws.address for ws in s.tasks["x"].who_has} == {a.address, b.address}
assert await y1 == 2
# y1 is finished so there's a worker available without a waiter
Expand Down Expand Up @@ -170,7 +172,7 @@ def run(self):
amm.run_once()
while len(s.tasks["x"].who_has) > 1:
await asyncio.sleep(0.01)
await asyncio.sleep(0.1)
await asyncio.sleep(0.2)
assert len(s.tasks["x"].who_has) == 1


Expand All @@ -183,7 +185,7 @@ async def test_double_drop_stress(c, s, a, b):
s.extensions["amm"].run_once()
while len(s.tasks["x"].who_has) > 1:
await asyncio.sleep(0.01)
await asyncio.sleep(0.1)
await asyncio.sleep(0.2)
assert len(s.tasks["x"].who_has) == 1


Expand Down Expand Up @@ -234,7 +236,7 @@ async def test_drop_with_empty_candidates(c, s, a, b):
"""
futures = await c.scatter({"x": 1}, broadcast=True)
s.extensions["amm"].run_once()
await asyncio.sleep(0.1)
await asyncio.sleep(0.2)
assert len(s.tasks["x"].who_has) == 2


Expand All @@ -250,7 +252,7 @@ async def test_drop_from_candidates_without_key(c, s, *workers):
assert s.tasks["x"].who_has == {ws0, ws1}

s.extensions["amm"].run_once()
await asyncio.sleep(0.1)
await asyncio.sleep(0.2)
assert s.tasks["x"].who_has == {ws0, ws1}


Expand Down Expand Up @@ -316,7 +318,7 @@ async def test_replicate(c, s, *workers):
s.extensions["amm"].run_once()
while len(s.tasks["x"].who_has) < 3:
await asyncio.sleep(0.01)
await asyncio.sleep(0.1)
await asyncio.sleep(0.2)
assert len(s.tasks["x"].who_has) == 3

s.extensions["amm"].run_once()
Expand Down Expand Up @@ -402,7 +404,7 @@ async def test_replicate_with_empty_candidates(c, s, a, b):
"""
futures = await c.scatter({"x": 1})
s.extensions["amm"].run_once()
await asyncio.sleep(0.1)
await asyncio.sleep(0.2)
assert len(s.tasks["x"].who_has) == 1


Expand All @@ -412,7 +414,7 @@ async def test_replicate_to_candidates_with_key(c, s, a, b):
ws0, ws1 = s.workers.values() # Not necessarily a, b; it could be b, a!
futures = await c.scatter({"x": 1}, workers=[ws0.address])
s.extensions["amm"].run_once()
await asyncio.sleep(0.1)
await asyncio.sleep(0.2)
assert s.tasks["x"].who_has == {ws0}


Expand Down

0 comments on commit f277a00

Please sign in to comment.