Skip to content

Commit

Permalink
Code review
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Sep 1, 2021
1 parent f277a00 commit c454ab5
Showing 1 changed file with 10 additions and 1 deletion.
11 changes: 10 additions & 1 deletion distributed/active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,18 @@ def run(self):
continue

desired_replicas = 1 # TODO have a marker on TaskState
ndrop = len(ts.who_has) - max(desired_replicas, len(ts.waiters))

# If a dependent task has not been assigned to a worker yet, err on the side
# of caution and preserve an additional replica for it.
# However, if two dependent tasks have been already assigned to the same
# worker, don't double count them.
nwaiters = len({waiter.processing_on or waiter for waiter in ts.waiters})

ndrop = len(ts.who_has) - max(desired_replicas, nwaiters)
if ts in self.manager.pending:
pending_repl, pending_drop = self.manager.pending[ts]
ndrop += len(pending_repl) - len(pending_drop)

# ndrop could be negative, which for range() is the same as 0.
for _ in range(ndrop):
yield "drop", ts, None

0 comments on commit c454ab5

Please sign in to comment.