Skip to content

Commit

Permalink
Revisit documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Feb 21, 2022
1 parent ba9d53f commit f32a7cd
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 76 deletions.
8 changes: 4 additions & 4 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,10 @@ distributed:

# Fractions of worker process memory at which we take action to avoid memory
# blowup. Set any of the values to False to turn off the behavior entirely.
target: 0.60 # target fraction to stay below
spill: 0.70 # fraction at which we spill to disk
pause: 0.80 # fraction at which we pause worker threads
terminate: 0.95 # fraction at which we terminate the worker
target: 0.60 # fraction of managed memory where we start spilling to disk
spill: 0.70 # fraction of process memory where we start spilling to disk
pause: 0.80 # fraction of process memory at which we pause worker threads
terminate: 0.95 # fraction of process memory at which we terminate the worker

# Max size of the spill file on disk (e.g. "10 GB")
# Set to false for no maximum.
Expand Down
129 changes: 57 additions & 72 deletions docs/source/worker.rst
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ exceptions to this are when:

Memory Management
-----------------

Workers are given a target memory limit to stay under with the
command line ``--memory-limit`` keyword or the ``memory_limit=`` Python
keyword argument, which sets the memory limit per worker processes launched
Expand All @@ -160,106 +159,92 @@ by dask-worker ::

Workers use a few different heuristics to keep memory use beneath this limit:

1. At 60% of memory load (as estimated by ``sizeof``), spill least recently used data
to disk.
2. At 70% of memory load (as reported by the OS), spill least recently used data to
disk regardless of what is reported by ``sizeof``; this accounts for memory used by
the python interpreter, modules, global variables, memory leaks, etc. The spilling
stops when the memory goes below 60%, in a hysteresis cycle.
3. At 80% of memory load (as reported by the OS), stop accepting new work on local
thread pool.
4. At 95% of memory load (as reported by the OS), terminate and restart the worker.

These values can be configured by modifying the ``~/.config/dask/distributed.yaml``
file:

.. code-block:: yaml
distributed:
worker:
# Fractions of worker memory at which we take action to avoid memory blowup
# Set any of the lower three values to False to turn off the behavior entirely
memory:
target: 0.60 # target fraction to stay below
spill: 0.70 # fraction at which we spill to disk
pause: 0.80 # fraction at which we pause worker threads
terminate: 0.95 # fraction at which we terminate the worker
It is possible to individually disable any of these by setting its value to False.
Setting 'target' while leaving 'spill' active disables the spill hysteresis cycle.


Spill data to disk
~~~~~~~~~~~~~~~~~~
Spilling based on managed memory
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Every time the worker finishes a task, it estimates the size in bytes that the result
costs to keep in memory using the ``sizeof`` function. This function defaults to
:func:`sys.getsizeof` for arbitrary objects, which uses the standard Python
``__sizeof__`` protocol, but also has special-cased implementations for common data
types like NumPy arrays and Pandas dataframes. The sum of the ``sizeof`` of all data
tracked by Dask is called :ref:`managed memory <memtypes>`_.

Every time the worker finishes a task it estimates the size in bytes that the
result costs to keep in memory using the ``sizeof`` function. This function
defaults to :func:`sys.getsizeof` for arbitrary objects, which uses the standard
Python ``__sizeof__`` protocol, but also has special-cased implementations for
common data types like NumPy arrays and Pandas dataframes.

When the sum of the number of bytes of the data in memory exceeds 60% of the
memory limit, the worker will begin to dump the least recently used data
to disk. You can control this location with the ``--local-directory``
keyword.::
When the managed memory exceeds 60% of the memory limit (*target threshold*), the worker
will begin to dump the least recently used data to disk. You can control this location
with the ``--local-directory`` keyword::

$ dask-worker tcp://scheduler:port --memory-limit="4 GiB" --local-directory /scratch

That data is still available and will be read back from disk when necessary.
On the diagnostic dashboard status page disk I/O will show up in the task
stream plot as orange blocks. Additionally, the memory plot in the upper left
will become yellow and then red.


Monitor process memory load
~~~~~~~~~~~~~~~~~~~~~~~~~~~
That data is still available and will be read back from disk when necessary. On the
diagnostic dashboard status page, disk I/O will show up in the task stream plot as
orange blocks. Additionally, the memory plot in the upper left will show a section of
the bar colored in grey.

Spilling based on process memory
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The approach above can fail for a few reasons:

1. Custom objects may not report their memory size accurately
2. User functions may take up more RAM than expected
3. Significant amounts of data may accumulate in network I/O buffers

To address this we periodically monitor the memory of the worker process every
200 ms. If the system reported memory use is above 70% of the target memory
usage then the worker will start dumping unused data to disk, even if internal
``sizeof`` recording hasn't yet reached the normal 60% limit.


Halt worker threads
~~~~~~~~~~~~~~~~~~~

At 80% load, the worker's thread pool will stop starting computation on
additional tasks in the worker's queue. This gives time for the write-to-disk
functionality to take effect even in the face of rapidly accumulating data.
Currently executing tasks continue to run.
To address this, we periodically monitor the :ref:`process memory <memtypes>`_ of the
worker every 200 ms. If the system reported memory use is above 70% of the target memory
usage (*spill threshold*), then the worker will start dumping unused data to disk, even
if internal ``sizeof`` recording hasn't yet reached the normal 60% threshold. This
more aggressive spilling will continue until process memory falls below 60%.

Pause worker
~~~~~~~~~~~~
At 80% :ref:`process memory <memtypes>`_ load, the worker's thread pool will stop
starting computation on additional tasks in the worker's queue. This gives time for the
write-to-disk functionality to take effect even in the face of rapidly accumulating
data. Currently executing tasks continue to run. Additionally, data transfers to/from
other workers are throttled to a bare minimum.

Kill Worker
~~~~~~~~~~~
At 95% :ref:`process memory <memtypes>`_ load (*terminate threshold*), a worker's nanny
process will terminate it. Tasks will be cancelled mid-execution and rescheduled
elsewhere; all unique data on the worker will be lost and will need to be recomputed.
This is to avoid having our worker job being terminated by an external watchdog (like
Kubernetes, YARN, Mesos, SGE, etc..). After termination, the nanny will restart the
worker in a fresh state.

Thresholds configuration
~~~~~~~~~~~~~~~~~~~~~~~~
These values can be configured by modifying the ``~/.config/dask/distributed.yaml``
file:

At 95% memory load, a worker's nanny process will terminate it. This is to
avoid having our worker job being terminated by an external job scheduler (like
YARN, Mesos, SGE, etc..). After termination, the nanny will restart the worker
in a fresh state.
.. code-block:: yaml
distributed:
worker:
# Fractions of worker process memory at which we take action to avoid memory
# blowup. Set any of the values to False to turn off the behavior entirely.
memory:
target: 0.60 # fraction of managed memory where we start spilling to disk
spill: 0.70 # fraction of process memory where we start spilling to disk
pause: 0.80 # fraction of process memory at which we pause worker threads
terminate: 0.95 # fraction of process memory at which we terminate the worker
Using the dashboard to monitor memory usage
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The dashboard (typically available on port 8787) shows a summary of the overall memory
usage on the cluster, as well as the individual usage on each worker. It provides
different memory readings:

.. _memtypes:

process
Overall memory used by the worker process (RSS), as measured by the OS

managed
This is the sum of the ``sizeof`` of all dask data stored on the worker, excluding
Sum of the ``sizeof`` of all Dask data stored on the worker, excluding
spilled data.

unmanaged
This is the memory usage that dask is not directly aware of. It is estimated by
subtracting managed memory from the total process memory and typically includes:
Memory usage that Dask is not directly aware of. It is estimated by subtracting
managed memory from the total process memory and typically includes:

- The Python interpreter code, loaded modules, and global variables
- Memory temporarily used by running tasks
Expand All @@ -284,7 +269,7 @@ unmanaged recent

By default, :meth:`distributed.Client.rebalance` and
:meth:`distributed.scheduler.Scheduler.rebalance` ignore unmanaged recent memory.
This behaviour can also be tweaked using the dask config - see the methods'
This behaviour can also be tweaked using the Dask config - see the methods'
documentation.

spilled
Expand Down Expand Up @@ -403,7 +388,7 @@ documentation.

Ignore process memory
~~~~~~~~~~~~~~~~~~~~~
If all else fails, you may want to stop dask from using memory metrics from the OS (RSS)
If all else fails, you may want to stop Dask from using memory metrics from the OS (RSS)
in its decision-making:

.. code-block:: yaml
Expand Down

0 comments on commit f32a7cd

Please sign in to comment.