diff --git a/distributed/active_memory_manager.py b/distributed/active_memory_manager.py index 0bcd83522c..7f3df5d966 100644 --- a/distributed/active_memory_manager.py +++ b/distributed/active_memory_manager.py @@ -304,9 +304,9 @@ def run( You may optionally retrieve which worker it was decided the key will be replicated to or dropped from, as follows: - ```python - choice = yield "replicate", ts, None - ``` + .. code-block:: python + + choice = (yield "replicate", ts, None) ``choice`` is either a WorkerState or None; the latter is returned if the ActiveMemoryManager chose to disregard the request. diff --git a/docs/source/active_memory_manager.rst b/docs/source/active_memory_manager.rst new file mode 100644 index 0000000000..3fd5a68f23 --- /dev/null +++ b/docs/source/active_memory_manager.rst @@ -0,0 +1,247 @@ +Active Memory Manager +===================== +The Active Memory Manager, or *AMM*, is an experimental daemon that optimizes memory +usage of workers across the Dask cluster. It is disabled by default. + + +Memory imbalance and duplication +-------------------------------- +Whenever a Dask task returns data, it is stored on the worker that executed the task for +as long as it's a dependency of other tasks, is referenced by a ``Client`` through a +``Future``, or is part of a :doc:`published dataset `. + +Dask assigns tasks to workers following criteria of CPU occupancy, :doc:`resources`, and +locality. In the trivial use case of tasks that are not connected to each other, take +the same time to compute, return data of the same size, and have no resource +constraints, one will observe a perfect balance in memory occupation across workers too. +In all other use cases, however, as the computation goes it could cause an imbalance in +memory usage. + +When a task runs on a worker and requires in input the output of a task from a different +worker, Dask will transparently transfer the data between workers, ending up with +multiple copies of the same data on different workers. This is generally desirable, as +it avoids re-transferring the data if it's required again later on. However, it also +causes increased overall memory usage across the cluster. + + +Enabling the Active Memory Manager +---------------------------------- +The AMM can be enabled through the :doc:`Dask configuration file `: + +.. code-block:: yaml + + distributed: + scheduler: + active-memory-manager: + start: true + interval: 2s + +The above is the recommended setup and will run all enabled *AMM policies* (see below) +every two seconds. Alternatively, you can manually start/stop the AMM from the +``Client`` or trigger a one-off iteration: + +.. code-block:: python + + >>> client.scheduler.amm_start() # Start running every 2 seconds + >>> client.scheduler.amm_stop() # Stop running periodically + >>> client.scheduler.amm_run_once() + + +Policies +-------- +The AMM by itself doesn't do anything. The user must enable *policies* which suggest +actions regarding Dask data. The AMM runs the policies and enacts their suggestions, as +long as they don't harm data integrity. These suggestions can be of two types: + +- Replicate the data of an in-memory Dask task from one worker to another. + This should not be confused with replication caused by task dependencies. +- Delete one or more replicas of an in-memory task. The AMM will never delete the last + replica of a task, even if a policy asks to. + +Unless a policy puts constraints on which workers should be impacted, the AMM will +automatically create replicas on workers with the lowest memory usage first and delete +them from workers with the highest memory usage first. + +Individual policies are enabled, disabled, and configured through the Dask config: + + +.. code-block:: yaml + + distributed: + scheduler: + active-memory-manager: + start: true + interval: 2s + policies: + - class: distributed.active_memory_manager.ReduceReplicas + - class: my_package.MyPolicy + arg1: foo + arg2: bar + +See below for custom policies like the one in the example above. + +The default Dask config file contains a sane selection of builtin policies that should +be generally desirable. You should try first with just ``start: true`` in your Dask +config and see if it is fit for purpose for you before you tweak individual policies. + + +Built-in policies +----------------- +ReduceReplicas +++++++++++++++ +class + ``distributed.active_memory_manager.ReduceReplicas`` +parameters + None + +This policy is enabled in the default Dask config. Whenever a Dask task is replicated +on more than one worker and the additional replicas don't appear to serve an ongoing +computation, this policy drops all excess replicas. + +.. note:: + This policy is incompatible with :meth:`~distributed.Client.replicate` and with the + ``broadcast=True`` parameter of :meth:`~distributed.Client.scatter`. If you invoke + ``replicate`` to create additional replicas and then later run this policy, it will + delete all replicas but one (but not necessarily the new ones). + + +Custom policies +--------------- +Power users can write their own policies by subclassing +:class:`~distributed.active_memory_manager.ActiveMemoryManagerPolicy`. The class should +define two methods: + +``__init__`` + A custom policy may load parameters from the Dask config through ``__init__`` + parameters. If you don't need configuration, you don't need to implement this + method. +``run`` + This method accepts no parameters and is invoked by the AMM every 2 seconds (or + whatever the AMM interval is). + It must yield zero or more of the following *suggestion* tuples: + + ``yield "replicate", , None`` + Create one replica of the target task on the worker with the lowest memory usage + that doesn't hold a replica yet. To create more than one replica, you need to + yield the same command more than once. + ``yield "replicate", , {, , ...}`` + Create one replica of the target task on the worker with the lowest memory among + the listed candidates. + ``yield "drop", , None`` + Delete one replica of the target task one the worker with the highest memory + usage across the whole cluster. + ``yield "drop", , {, , ...}`` + Delete one replica of the target task on the worker with the highest memory + among the listed candidates. + + The AMM will silently reject unacceptable suggestions, such as: + + - Delete the last replica of a task + - Delete a replica from a subset of workers that don't hold any + - Delete a replica from a worker that currently needs it for computation + - Replicate a task that is not yet in memory + - Create more replicas of a task than there are workers + - Create replicas of a task on workers that already hold them + - Create replicas on paused or retiring workers + + It is generally a good idea to design policies to be as simple as possible and let + the AMM take care of the edge cases above by ignoring some of the suggestions. + + Optionally, the ``run`` method may retrieve which worker the AMM just selected, as + follows: + + .. code-block:: python + + ws = (yield "drop", ts, None) + +The ``run`` method can access the following attributes: + +``self.manager`` + The :class:`~distributed.active_memory_manager.ActiveMemoryManagerExtension` that + the policy is attached to +``self.manager.scheduler`` + :class:`~distributed.Scheduler` to which the suggestions will be applied. From there + you can access various attributes such as ``tasks`` and ``workers``. +``self.manager.workers_memory`` + Read-only mapping of ``{WorkerState: bytes}``. bytes is the expected RAM usage of + the worker after all suggestions accepted so far in the current AMM iteration, from + all policies, will be enacted. Note that you don't need to access this if you are + happy to always create/delete replicas on the workers with the lowest and highest + memory usage respectively - the AMM will handle it for you. +``self.manager.pending`` + Read-only mapping of ``{TaskState: ({, ...}, {, ...})``. + The first set contains the workers that will receive a new replica of the task + according to the suggestions accepted so far; the second set contains the workers + which will lose a replica. +``self.manager.policies`` + Set of policies registered in the AMM. A policy can deregister itself as follows: + + .. code-block:: python + + def run(self): + self.manager.policies.drop(self) + +Example ++++++++ +The following custom policy ensures that keys "foo" and "bar" are replicated on all +workers at all times. New workers will receive a replica soon after connecting to the +scheduler. The policy will do nothing if the target keys are not in memory somewhere or +if all workers already hold a replica. +Note that this example is incompatible with the ``ReduceReplicas`` built-in policy. + +In mymodule.py (it must be accessible by the scheduler): + +.. code-block:: python + + from distributed.active_memory_manager import ActiveMemoryManagerPolicy + + + class EnsureBroadcast(ActiveMemoryManagerPolicy): + def __init__(self, key): + self.key = key + + def run(self): + ts = self.manager.scheduler.tasks.get(self.key) + if not ts: + return + for _ in range(len(self.manager.scheduler.workers) - len(ts.who_has)): + yield "replicate", ts, None + +Note that the policy doesn't bother testing for edge cases such as paused workers or +other policies also requesting replicas; the AMM takes care of it. In theory you could +rewrite the last two lines as follows (at the cost of some wasted CPU cycles): + +.. code-block:: python + + for _ in range(1000): + yield "replicate", ts, None + +In distributed.yaml: + +.. code-block:: yaml + + distributed: + scheduler: + active-memory-manager: + start: true + interval: 2s + policies: + - class: mymodule.EnsureBroadcast + key: foo + - class: mymodule.EnsureBroadcast + key: bar + +We could have alternatively used a single policy instance with a list of keys - the +above design merely illustrates that you may have multiple instances of the same policy +running side by side. + + +API reference +------------- +.. autoclass:: distributed.active_memory_manager.ActiveMemoryManagerExtension + :members: + +.. autoclass:: distributed.active_memory_manager.ActiveMemoryManagerPolicy + :members: + +.. autoclass:: distributed.active_memory_manager.ReduceReplicas diff --git a/docs/source/conf.py b/docs/source/conf.py index a36a5826f1..e2a7d4b287 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -147,7 +147,7 @@ # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ["_static"] +html_static_path: list[str] = [] # Add any extra paths that contain custom files (such as robots.txt or # .htaccess) here, relative to this directory. These files are copied diff --git a/docs/source/diagnosing-performance.rst b/docs/source/diagnosing-performance.rst index cf8e85aa1c..a52e23203b 100644 --- a/docs/source/diagnosing-performance.rst +++ b/docs/source/diagnosing-performance.rst @@ -19,9 +19,8 @@ identify performance issues. Fortunately, Dask collects a variety of diagnostic information during execution. It does this both to provide performance feedback to users, but also for its own internal scheduling decisions. The primary place to observe -this feedback is the :doc:`diagnostic dashboard `. This document -describes the various pieces of performance information available and how to -access them. +this feedback is the diagnostic dashboard. This document describes the various +pieces of performance information available and how to access them. Task start and stop times diff --git a/docs/source/index.rst b/docs/source/index.rst index 4364a97c95..6535ac5103 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -109,14 +109,13 @@ Contents actors asynchronous - configuration ipython - prometheus http_services publish resources task-launch tls + active_memory_manager .. toctree:: :maxdepth: 1 diff --git a/docs/source/tls.rst b/docs/source/tls.rst index 0c635b8576..f3ad2fc31e 100644 --- a/docs/source/tls.rst +++ b/docs/source/tls.rst @@ -50,7 +50,7 @@ One can also pass additional parameters: All those parameters can be passed in several ways: -* through the Dask :ref:`configuration file `; +* through the Dask :doc:`configuration file `; * if using the command line, through options to ``dask-scheduler`` and ``dask-worker``; * if using the API, through a ``Security`` object. For example, here is