-
-
Notifications
You must be signed in to change notification settings - Fork 734
Description
I think there is a logic error with bookkeping for TaskGroup.nbytes_in_memory
. There's a discrepancy between how we increment it and decrement it when multiple workers hold the same key.
In transition_memory_released
, we decrement it by nbytes
once for every worker that holds that task:
distributed/distributed/scheduler.py
Lines 2646 to 2649 in 6340b5b
for ws in ts._who_has: | |
del ws._has_what[ts] | |
ws._nbytes -= ts_nbytes | |
ts._group._nbytes_in_memory -= ts_nbytes |
Whereas in
_propagate_forgotten
, we decrement it once by nbytes
if there are any workers holding the task, regardless of how many. This doesn't match with transition_memory_released
:distributed/distributed/scheduler.py
Lines 7339 to 7341 in 646b12b
ts_nbytes: Py_ssize_t = ts.get_nbytes() | |
if ts._who_has: | |
ts._group._nbytes_in_memory -= ts_nbytes |
On the creation side, in TaskState.set_nbytes
, we only increment it by the diff between the last known value and the current value. If the key is being copied to multiple workers, this difference is usually 0:
distributed/distributed/scheduler.py
Lines 1556 to 1562 in 6340b5b
def set_nbytes(self, nbytes: Py_ssize_t): | |
diff: Py_ssize_t = nbytes | |
old_nbytes: Py_ssize_t = self._nbytes | |
if old_nbytes >= 0: | |
diff -= old_nbytes | |
self._group._nbytes_total += diff | |
self._group._nbytes_in_memory += diff |
In short, I think TaskGroup.nbytes_in_memory
is incremented once per key, but decremented once per copy of the key.
If nbytes
can be different for different workers, then to do this bookkeeping correctly, I think we'd also need to track TaskState.total_nbytes
(size of all copies of the key), then decrement by that once in transition_memory_released
and _propagate_forgotten
.
Discovered in #4925 (comment). I think #4925 made this more apparent, since it encourages more data replication.
cc @crusaderky since you know more about replicated keys.