diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 07abe57d874..793da7b3f33 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -140,10 +140,10 @@ distributed: # Fractions of worker process memory at which we take action to avoid memory # blowup. Set any of the values to False to turn off the behavior entirely. - target: 0.60 # target fraction to stay below - spill: 0.70 # fraction at which we spill to disk - pause: 0.80 # fraction at which we pause worker threads - terminate: 0.95 # fraction at which we terminate the worker + target: 0.60 # fraction of managed memory where we start spilling to disk + spill: 0.70 # fraction of process memory where we start spilling to disk + pause: 0.80 # fraction of process memory at which we pause worker threads + terminate: 0.95 # fraction of process memory at which we terminate the worker # Max size of the spill file on disk (e.g. "10 GB") # Set to false for no maximum. diff --git a/docs/source/worker.rst b/docs/source/worker.rst index 9a4cad0c519..eafaee1568f 100644 --- a/docs/source/worker.rst +++ b/docs/source/worker.rst @@ -149,7 +149,6 @@ exceptions to this are when: Memory Management ----------------- - Workers are given a target memory limit to stay under with the command line ``--memory-limit`` keyword or the ``memory_limit=`` Python keyword argument, which sets the memory limit per worker processes launched @@ -160,89 +159,73 @@ by dask-worker :: Workers use a few different heuristics to keep memory use beneath this limit: -1. At 60% of memory load (as estimated by ``sizeof``), spill least recently used data - to disk. -2. At 70% of memory load (as reported by the OS), spill least recently used data to - disk regardless of what is reported by ``sizeof``; this accounts for memory used by - the python interpreter, modules, global variables, memory leaks, etc. The spilling - stops when the memory goes below 60%, in a hysteresis cycle. -3. At 80% of memory load (as reported by the OS), stop accepting new work on local - thread pool. -4. At 95% of memory load (as reported by the OS), terminate and restart the worker. - -These values can be configured by modifying the ``~/.config/dask/distributed.yaml`` -file: - -.. code-block:: yaml - - distributed: - worker: - # Fractions of worker memory at which we take action to avoid memory blowup - # Set any of the lower three values to False to turn off the behavior entirely - memory: - target: 0.60 # target fraction to stay below - spill: 0.70 # fraction at which we spill to disk - pause: 0.80 # fraction at which we pause worker threads - terminate: 0.95 # fraction at which we terminate the worker - -It is possible to individually disable any of these by setting its value to False. -Setting 'target' while leaving 'spill' active disables the spill hysteresis cycle. - - -Spill data to disk -~~~~~~~~~~~~~~~~~~ +Spilling based on managed memory +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Every time the worker finishes a task, it estimates the size in bytes that the result +costs to keep in memory using the ``sizeof`` function. This function defaults to +:func:`sys.getsizeof` for arbitrary objects, which uses the standard Python +``__sizeof__`` protocol, but also has special-cased implementations for common data +types like NumPy arrays and Pandas dataframes. The sum of the ``sizeof`` of all data +tracked by Dask is called :ref:`managed memory `_. -Every time the worker finishes a task it estimates the size in bytes that the -result costs to keep in memory using the ``sizeof`` function. This function -defaults to :func:`sys.getsizeof` for arbitrary objects, which uses the standard -Python ``__sizeof__`` protocol, but also has special-cased implementations for -common data types like NumPy arrays and Pandas dataframes. - -When the sum of the number of bytes of the data in memory exceeds 60% of the -memory limit, the worker will begin to dump the least recently used data -to disk. You can control this location with the ``--local-directory`` -keyword.:: +When the managed memory exceeds 60% of the memory limit (*target threshold*), the worker +will begin to dump the least recently used data to disk. You can control this location +with the ``--local-directory`` keyword:: $ dask-worker tcp://scheduler:port --memory-limit="4 GiB" --local-directory /scratch -That data is still available and will be read back from disk when necessary. -On the diagnostic dashboard status page disk I/O will show up in the task -stream plot as orange blocks. Additionally, the memory plot in the upper left -will become yellow and then red. - - -Monitor process memory load -~~~~~~~~~~~~~~~~~~~~~~~~~~~ +That data is still available and will be read back from disk when necessary. On the +diagnostic dashboard status page, disk I/O will show up in the task stream plot as +orange blocks. Additionally, the memory plot in the upper left will show a section of +the bar colored in grey. +Spilling based on process memory +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The approach above can fail for a few reasons: 1. Custom objects may not report their memory size accurately 2. User functions may take up more RAM than expected 3. Significant amounts of data may accumulate in network I/O buffers -To address this we periodically monitor the memory of the worker process every -200 ms. If the system reported memory use is above 70% of the target memory -usage then the worker will start dumping unused data to disk, even if internal -``sizeof`` recording hasn't yet reached the normal 60% limit. - - -Halt worker threads -~~~~~~~~~~~~~~~~~~~ - -At 80% load, the worker's thread pool will stop starting computation on -additional tasks in the worker's queue. This gives time for the write-to-disk -functionality to take effect even in the face of rapidly accumulating data. -Currently executing tasks continue to run. +To address this, we periodically monitor the :ref:`process memory `_ of the +worker every 200 ms. If the system reported memory use is above 70% of the target memory +usage (*spill threshold*), then the worker will start dumping unused data to disk, even +if internal ``sizeof`` recording hasn't yet reached the normal 60% threshold. This +more aggressive spilling will continue until process memory falls below 60%. +Pause worker +~~~~~~~~~~~~ +At 80% :ref:`process memory `_ load, the worker's thread pool will stop +starting computation on additional tasks in the worker's queue. This gives time for the +write-to-disk functionality to take effect even in the face of rapidly accumulating +data. Currently executing tasks continue to run. Additionally, data transfers to/from +other workers are throttled to a bare minimum. Kill Worker ~~~~~~~~~~~ +At 95% :ref:`process memory `_ load (*terminate threshold*), a worker's nanny +process will terminate it. Tasks will be cancelled mid-execution and rescheduled +elsewhere; all unique data on the worker will be lost and will need to be recomputed. +This is to avoid having our worker job being terminated by an external watchdog (like +Kubernetes, YARN, Mesos, SGE, etc..). After termination, the nanny will restart the +worker in a fresh state. + +Thresholds configuration +~~~~~~~~~~~~~~~~~~~~~~~~ +These values can be configured by modifying the ``~/.config/dask/distributed.yaml`` +file: -At 95% memory load, a worker's nanny process will terminate it. This is to -avoid having our worker job being terminated by an external job scheduler (like -YARN, Mesos, SGE, etc..). After termination, the nanny will restart the worker -in a fresh state. +.. code-block:: yaml + distributed: + worker: + # Fractions of worker process memory at which we take action to avoid memory + # blowup. Set any of the values to False to turn off the behavior entirely. + memory: + target: 0.60 # fraction of managed memory where we start spilling to disk + spill: 0.70 # fraction of process memory where we start spilling to disk + pause: 0.80 # fraction of process memory at which we pause worker threads + terminate: 0.95 # fraction of process memory at which we terminate the worker Using the dashboard to monitor memory usage ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -250,16 +233,18 @@ The dashboard (typically available on port 8787) shows a summary of the overall usage on the cluster, as well as the individual usage on each worker. It provides different memory readings: +.. _memtypes: + process Overall memory used by the worker process (RSS), as measured by the OS managed - This is the sum of the ``sizeof`` of all dask data stored on the worker, excluding + Sum of the ``sizeof`` of all Dask data stored on the worker, excluding spilled data. unmanaged - This is the memory usage that dask is not directly aware of. It is estimated by - subtracting managed memory from the total process memory and typically includes: + Memory usage that Dask is not directly aware of. It is estimated by subtracting + managed memory from the total process memory and typically includes: - The Python interpreter code, loaded modules, and global variables - Memory temporarily used by running tasks @@ -284,7 +269,7 @@ unmanaged recent By default, :meth:`distributed.Client.rebalance` and :meth:`distributed.scheduler.Scheduler.rebalance` ignore unmanaged recent memory. - This behaviour can also be tweaked using the dask config - see the methods' + This behaviour can also be tweaked using the Dask config - see the methods' documentation. spilled @@ -403,7 +388,7 @@ documentation. Ignore process memory ~~~~~~~~~~~~~~~~~~~~~ -If all else fails, you may want to stop dask from using memory metrics from the OS (RSS) +If all else fails, you may want to stop Dask from using memory metrics from the OS (RSS) in its decision-making: .. code-block:: yaml