diff --git a/distributed/active_memory_manager.py b/distributed/active_memory_manager.py index 790164385c4..4ed2daf4113 100644 --- a/distributed/active_memory_manager.py +++ b/distributed/active_memory_manager.py @@ -291,9 +291,18 @@ def run(self): continue desired_replicas = 1 # TODO have a marker on TaskState - ndrop = len(ts.who_has) - max(desired_replicas, len(ts.waiters)) + + # If a dependent task has not been assigned to a worker yet, err on the side + # of caution and preserve an additional replica for it. + # However, if two dependent tasks have been already assigned to the same + # worker, don't double count them. + nwaiters = len({waiter.processing_on or waiter for waiter in ts.waiters}) + + ndrop = len(ts.who_has) - max(desired_replicas, nwaiters) if ts in self.manager.pending: pending_repl, pending_drop = self.manager.pending[ts] ndrop += len(pending_repl) - len(pending_drop) + + # ndrop could be negative, which for range() is the same as 0. for _ in range(ndrop): yield "drop", ts, None