Skip to content

Commit

Permalink
Code review
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Sep 1, 2021
1 parent dfb1a09 commit dabe046
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 26 deletions.
31 changes: 17 additions & 14 deletions distributed/active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ def __init__(
scheduler: SchedulerState,
# The following parameters are exposed so that one may create, run, and throw
# away on the fly a specialized manager, separate from the main one.
policies: set[ActiveMemoryManagerPolicy] = None,
policies: Optional[set[ActiveMemoryManagerPolicy]] = None,
register: bool = True,
start: bool = None,
interval: float = None,
start: Optional[bool] = None,
interval: Optional[float] = None,
):
self.scheduler = scheduler

Expand Down Expand Up @@ -91,7 +91,7 @@ def __init__(
self.start()

def start(self, comm=None) -> None:
"""Start executing very <interval> seconds until scheduler shutdown"""
"""Start executing every ``self.interval`` seconds until scheduler shutdown"""
pc = PeriodicCallback(self.run_once, self.interval * 1000.0)
self.scheduler.periodic_callbacks["amm"] = pc
pc.start()
Expand Down Expand Up @@ -188,7 +188,7 @@ def _find_recipient(
candidates: Optional[set[WorkerState]],
pending_repl: set[WorkerState],
) -> Optional[WorkerState]:
"""Choose a worker to acquire a new replica of an-in-memory task among a set of
"""Choose a worker to acquire a new replica of an in-memory task among a set of
candidates. If candidates is None, default to all workers in the cluster that do
not hold a replica yet. The worker with the lowest memory usage (downstream of
pending replications and drops) will be returned.
Expand All @@ -201,10 +201,7 @@ def _find_recipient(
candidates -= pending_repl
if not candidates:
return None
# id(ws) is there just to prevent WorkerState objects to be compared in the
# unlikely event that they have exactly the same amount of bytes allocated.
_, _, ws = min((self.workers_memory[ws], id(ws), ws) for ws in candidates)
return ws
return min(candidates, key=self.workers_memory.get)

def _find_dropper(
self,
Expand All @@ -227,10 +224,7 @@ def _find_dropper(
candidates -= {waiter_ts.processing_on for waiter_ts in ts.waiters}
if not candidates:
return None
# id(ws) is there just to prevent WorkerState objects to be compared in the
# unlikely event that they have exactly the same amount of bytes allocated.
_, _, ws = max((self.workers_memory[ws], id(ws), ws) for ws in candidates)
return ws
return max(candidates, key=self.workers_memory.get)


class ActiveMemoryManagerPolicy:
Expand Down Expand Up @@ -297,9 +291,18 @@ def run(self):
continue

desired_replicas = 1 # TODO have a marker on TaskState
ndrop = len(ts.who_has) - max(desired_replicas, len(ts.waiters))

# If a dependent task has not been assigned to a worker yet, err on the side
# of caution and preserve an additional replica for it.
# However, if two dependent tasks have been already assigned to the same
# worker, don't double count them.
nwaiters = len({waiter.processing_on or waiter for waiter in ts.waiters})

ndrop = len(ts.who_has) - max(desired_replicas, nwaiters)
if ts in self.manager.pending:
pending_repl, pending_drop = self.manager.pending[ts]
ndrop += len(pending_repl) - len(pending_drop)

# ndrop could be negative, which for range() is the same as 0.
for _ in range(ndrop):
yield "drop", ts, None
26 changes: 14 additions & 12 deletions distributed/tests/test_active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async def test_drop(c, s, *workers):
while len(s.tasks["x"].who_has) > 1:
await asyncio.sleep(0.01)
# The last copy is never dropped even if the policy asks so
await asyncio.sleep(0.1)
await asyncio.sleep(0.2)
assert len(s.tasks["x"].who_has) == 1


Expand All @@ -88,14 +88,16 @@ async def test_start_stop(c, s, a, b):
await c.scheduler.amm_stop()
# AMM is not running anymore
await c.replicate(x, 2)
await asyncio.sleep(0.1)
await asyncio.sleep(0.2)
assert len(s.tasks["x"].who_has) == 2


@gen_cluster(client=True, config=demo_config("drop", start=True))
@gen_cluster(client=True, config=demo_config("drop", start=True, interval=0.1))
async def test_auto_start(c, s, a, b):
futures = await c.scatter({"x": 123}, broadcast=True)
await asyncio.sleep(0.3)
# The AMM should run within 0.1s of the broadcast.
# Add generous extra padding to prevent flakiness.
await asyncio.sleep(0.5)
assert len(s.tasks["x"].who_has) == 1


Expand Down Expand Up @@ -138,7 +140,7 @@ async def test_drop_with_waiter(c, s, a, b):
await asyncio.sleep(0.01)

s.extensions["amm"].run_once()
await asyncio.sleep(0.1)
await asyncio.sleep(0.2)
assert {ws.address for ws in s.tasks["x"].who_has} == {a.address, b.address}
assert await y1 == 2
# y1 is finished so there's a worker available without a waiter
Expand Down Expand Up @@ -170,7 +172,7 @@ def run(self):
amm.run_once()
while len(s.tasks["x"].who_has) > 1:
await asyncio.sleep(0.01)
await asyncio.sleep(0.1)
await asyncio.sleep(0.2)
assert len(s.tasks["x"].who_has) == 1


Expand All @@ -183,7 +185,7 @@ async def test_double_drop_stress(c, s, a, b):
s.extensions["amm"].run_once()
while len(s.tasks["x"].who_has) > 1:
await asyncio.sleep(0.01)
await asyncio.sleep(0.1)
await asyncio.sleep(0.2)
assert len(s.tasks["x"].who_has) == 1


Expand Down Expand Up @@ -234,7 +236,7 @@ async def test_drop_with_empty_candidates(c, s, a, b):
"""
futures = await c.scatter({"x": 1}, broadcast=True)
s.extensions["amm"].run_once()
await asyncio.sleep(0.1)
await asyncio.sleep(0.2)
assert len(s.tasks["x"].who_has) == 2


Expand All @@ -250,7 +252,7 @@ async def test_drop_from_candidates_without_key(c, s, *workers):
assert s.tasks["x"].who_has == {ws0, ws1}

s.extensions["amm"].run_once()
await asyncio.sleep(0.1)
await asyncio.sleep(0.2)
assert s.tasks["x"].who_has == {ws0, ws1}


Expand Down Expand Up @@ -316,7 +318,7 @@ async def test_replicate(c, s, *workers):
s.extensions["amm"].run_once()
while len(s.tasks["x"].who_has) < 3:
await asyncio.sleep(0.01)
await asyncio.sleep(0.1)
await asyncio.sleep(0.2)
assert len(s.tasks["x"].who_has) == 3

s.extensions["amm"].run_once()
Expand Down Expand Up @@ -402,7 +404,7 @@ async def test_replicate_with_empty_candidates(c, s, a, b):
"""
futures = await c.scatter({"x": 1})
s.extensions["amm"].run_once()
await asyncio.sleep(0.1)
await asyncio.sleep(0.2)
assert len(s.tasks["x"].who_has) == 1


Expand All @@ -412,7 +414,7 @@ async def test_replicate_to_candidates_with_key(c, s, a, b):
ws0, ws1 = s.workers.values() # Not necessarily a, b; it could be b, a!
futures = await c.scatter({"x": 1}, workers=[ws0.address])
s.extensions["amm"].run_once()
await asyncio.sleep(0.1)
await asyncio.sleep(0.2)
assert s.tasks["x"].who_has == {ws0}


Expand Down

0 comments on commit dabe046

Please sign in to comment.