Skip to content

Commit

Permalink
Document and test spill->target hysteresis cycle
Browse files Browse the repository at this point in the history
try fix
  • Loading branch information
crusaderky committed Feb 16, 2022
1 parent 8d0df89 commit 1d2f1b4
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 7 deletions.
59 changes: 57 additions & 2 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1365,11 +1365,66 @@ def __reduce__(self):
await wait(f1)
assert set(a.data.memory) == {"f1"}

futures = c.map(OverReport, range(int(100e6), int(100e6) + 8))
with captured_logger("distributed.worker") as logger:
futures = c.map(OverReport, range(int(100e6), int(100e6) + 5))

while not a.data.disk:
await asyncio.sleep(0.01)
assert "f1" in a.data.disk
await asyncio.sleep(0.5)

# Spilling normally starts at the spill threshold and stops at the target threshold.
# In this special case, it stops as soon as the process memory goes below the spill
# threshold, e.g. without a hysteresis cycle. Test that we didn't instead dump the
# whole data to disk (memory_limit * target = 0)
assert "Unmanaged memory use is high" not in logger.getvalue()


@pytest.mark.slow
@gen_cluster(
nthreads=[("", 1)],
client=True,
worker_kwargs=dict(
memory_limit="1 GiB", # See FIXME note in previous test
memory_monitor_interval="10ms",
memory_target_fraction=0.4,
memory_spill_fraction=0.7,
memory_pause_fraction=False,
),
)
async def test_spill_hysteresis(c, s, a):
memory = s.workers[a.address].memory.process
a.memory_limit = memory + 2**30

# Under-report managed memory, so that we reach the spill threshold for process
# memory withouth first reaching the target threshold for managed memory
class UnderReport:
def __init__(self):
self.data = "x" * (50 * 2**20) # 50 MiB

def __sizeof__(self):
return 1

def __reduce__(self):
"""Speed up test by writing very little to disk when spilling"""
return UnderReport, ()

max_in_memory = 0
futures = []
while not a.data.disk:
futures.append(c.submit(UnderReport, pure=False))
max_in_memory = max(max_in_memory, len(a.data.memory))
await wait(futures)
await asyncio.sleep(0.1)
max_in_memory = max(max_in_memory, len(a.data.memory))

# If there were no hysteresis, we would lose exactly 1 key.
# Note that, for this test to be meaningful, memory must shrink down readily when
# we deallocate Python objects. This is not always the case on Windows and MacOSX;
# on Linux we set MALLOC_TRIM to help in that regard.
# To verify that this test is useful, set target=spill and watch it fail.
while len(a.data.memory) > max_in_memory - 3:
await asyncio.sleep(0.01)
assert "f1" in a.data.disk


@pytest.mark.slow
Expand Down
9 changes: 8 additions & 1 deletion distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3743,7 +3743,14 @@ def check_pause(memory):
frac * 100,
)
start = time()
target = self.memory_limit * self.memory_target_fraction
# Implement hysteresis cycle where spilling starts at the spill threshold
# and stops at the target threshold. Normally that here the target threshold
# defines process memory, whereas normally it defines reported managed
# memory (e.g. output of sizeof() ).
# If target=False, disable hysteresis.
target = self.memory_limit * (
self.memory_target_fraction or self.memory_spill_fraction
)
count = 0
need = memory - target
while memory > target:
Expand Down
12 changes: 8 additions & 4 deletions docs/source/worker.rst
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,14 @@ by dask-worker ::
Workers use a few different heuristics to keep memory use beneath this limit:

1. At 60% of memory load (as estimated by ``sizeof``), spill least recently used data
to disk
to disk.
2. At 70% of memory load (as reported by the OS), spill least recently used data to
disk regardless of what is reported by ``sizeof``; this accounts for memory used by
the python interpreter, modules, global variables, memory leaks, etc.
the python interpreter, modules, global variables, memory leaks, etc. The spilling
stops when the memory goes below 60%, in a hysteresis cycle.
3. At 80% of memory load (as reported by the OS), stop accepting new work on local
thread pool
4. At 95% of memory load (as reported by the OS), terminate and restart the worker
thread pool.
4. At 95% of memory load (as reported by the OS), terminate and restart the worker.

These values can be configured by modifying the ``~/.config/dask/distributed.yaml``
file:
Expand All @@ -184,6 +185,9 @@ file:
pause: 0.80 # fraction at which we pause worker threads
terminate: 0.95 # fraction at which we terminate the worker
It is possible to individually disable any of these by setting its value to False.
Setting 'target' while leaving 'spill' active disables the spill hysteresis cycle.


Spill data to disk
~~~~~~~~~~~~~~~~~~
Expand Down

0 comments on commit 1d2f1b4

Please sign in to comment.