Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Active Memory Manager framework + discard excess replicas #5111

Merged
merged 5 commits into from
Sep 7, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
305 changes: 305 additions & 0 deletions distributed/active_memory_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
from __future__ import annotations

import asyncio
from collections import defaultdict
from collections.abc import Generator
from typing import TYPE_CHECKING, Optional

from tornado.ioloop import PeriodicCallback

import dask
from dask.utils import parse_timedelta

from .utils import import_term

if TYPE_CHECKING:
from .scheduler import SchedulerState, TaskState, WorkerState


class ActiveMemoryManagerExtension:
"""Scheduler extension that optimizes memory usage across the cluster.
It can be either triggered by hand or automatically every few seconds; at every
iteration it performs one or both of the following:

- create new replicas of in-memory tasks
- destroy replicas of in-memory tasks; this never destroys the last available copy.

There are no 'move' operations. A move is performed in two passes: first you create
a copy and, in the next iteration, you delete the original (if the copy succeeded).

This extension is configured by the dask config section
``distributed.scheduler.active-memory-manager``.
"""

scheduler: SchedulerState
policies: set[ActiveMemoryManagerPolicy]
interval: float
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future I expect that different policies will want to operate on different timelines. For example policies that are more expensive to run might run much less frequently.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like a desirable feature; however is it ok to leave it for the future?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me


# These attributes only exist within the scope of self.run()
# Current memory (in bytes) allocated on each worker, plus/minus pending actions
workers_memory: dict[WorkerState, int]
# Pending replications and deletions for each task
pending: defaultdict[TaskState, tuple[set[WorkerState], set[WorkerState]]]

def __init__(
self,
scheduler: SchedulerState,
# The following parameters are exposed so that one may create, run, and throw
# away on the fly a specialized manager, separate from the main one.
policies: set[ActiveMemoryManagerPolicy] = None,
register: bool = True,
start: bool = None,
interval: float = None,
crusaderky marked this conversation as resolved.
Show resolved Hide resolved
):
self.scheduler = scheduler

if policies is None:
policies = set()
for kwargs in dask.config.get(
"distributed.scheduler.active-memory-manager.policies"
):
kwargs = kwargs.copy()
cls = import_term(kwargs.pop("class"))
if not issubclass(cls, ActiveMemoryManagerPolicy):
raise TypeError(
f"{cls}: Expected ActiveMemoryManagerPolicy; got {type(cls)}"
)
policies.add(cls(**kwargs))

for policy in policies:
policy.manager = self
self.policies = policies

if register:
scheduler.extensions["amm"] = self
scheduler.handlers.update(
{
"amm_run_once": self.run_once,
"amm_start": self.start,
"amm_stop": self.stop,
}
)

if interval is None:
interval = parse_timedelta(
dask.config.get("distributed.scheduler.active-memory-manager.interval")
)
self.interval = interval
if start is None:
start = dask.config.get("distributed.scheduler.active-memory-manager.start")
if start:
self.start()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there situations where we would want the extension but not have it running? If this is speculatively valuable then I recommend that we leave it off for now until we need it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because of how scheduler extensions work - from a user's perspective, I think it's a lot more user friendly to have the extension always loaded through DEFAULT_EXTENSIONS and available either through client.scheduler.amm_start() / client.scheduler.amm_run_once() or through a config switch, instead of having to load it with

def load_amm(dask_scheduler):
    from distributed.active_memory_manager import ActiveMemoryManagerExtension
    ActiveMemoryManagerExtension(dask_scheduler)

client.run_on_scheduler(load_amm)

An alternative to the start key would be to move DEFAULT_EXTENSIONS to the dask config, but that would carry the risk of users falling behind: since you can't change a list in the dask config, but only override the whole list, if a user changed it in his custom dask.config and later on we add a default extension, the user won't get it.


def start(self, comm=None) -> None:
"""Start executing very <interval> seconds until scheduler shutdown"""
crusaderky marked this conversation as resolved.
Show resolved Hide resolved
pc = PeriodicCallback(self.run_once, self.interval * 1000.0)
self.scheduler.periodic_callbacks["amm"] = pc
pc.start()

def stop(self, comm=None) -> None:
"""Stop periodic execution"""
pc = self.scheduler.periodic_callbacks.pop("amm", None)
if pc:
pc.stop()

def run_once(self, comm=None) -> None:
"""Run all policies once and asynchronously (fire and forget) enact their
recommendations to replicate/drop keys
"""
# This should never fail since this is a synchronous method
assert not hasattr(self, "pending")

self.pending = defaultdict(lambda: (set(), set()))
self.workers_memory = {
w: w.memory.optimistic for w in self.scheduler.workers.values()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Over in #4925 (comment) we discovered that iterating over the .values() of a SortedDict was surprisingly slow. Maybe this is an over-optimization, but you could consider replacing scheduler.workers.values() with dict.values(scheduler.workers) (like I did here) throughout this PR, which might make these lines an order of magnitude faster.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll push back on micro optimizing the worker state machine. It's not a bottleneck in performance but is often a stability concern. I think that we should optimize pretty hard for readability.

}
try:
# populate self.pending
self._run_policies()

drop_by_worker = defaultdict(set)
repl_by_worker = defaultdict(dict)
for ts, (pending_repl, pending_drop) in self.pending.items():
if not ts.who_has:
continue
who_has = [ws_snd.address for ws_snd in ts.who_has - pending_drop]
assert who_has # Never drop the last replica
for ws_rec in pending_repl:
assert ws_rec not in ts.who_has
repl_by_worker[ws_rec.address][ts.key] = who_has
for ws in pending_drop:
assert ws in ts.who_has
drop_by_worker[ws.address].add(ts.key)

# Fire-and-forget enact recommendations from policies
# This is temporary code, waiting for
# https://github.com/dask/distributed/pull/5046
for addr, who_has in repl_by_worker.items():
asyncio.create_task(self.scheduler.gather_on_worker(addr, who_has))
for addr, keys in drop_by_worker.items():
asyncio.create_task(self.scheduler.delete_worker_data(addr, keys))
# End temporary code

finally:
del self.workers_memory
del self.pending

def _run_policies(self) -> None:
"""Sequentially run ActiveMemoryManagerPolicy.run() for all registered policies,
obtain replicate/drop suggestions, and use them to populate self.pending.
"""
candidates: Optional[set[WorkerState]]
cmd: str
ws: Optional[WorkerState]
ts: TaskState
nreplicas: int

for policy in list(self.policies): # a policy may remove itself
policy_gen = policy.run()
ws = None
while True:
try:
cmd, ts, candidates = policy_gen.send(ws)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to admit that I did not even know about this feature. I don't see it being used anywhere. Do you have already an application in mind or even an example where this is used? merely out of curiosity, I don't mind keeping it in this PR

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thanks.

For other readers who were not aware of this feature, it is described in PEP 342

Thanks

except StopIteration:
break # next policy

pending_repl, pending_drop = self.pending[ts]

if cmd == "replicate":
ws = self._find_recipient(ts, candidates, pending_repl)
if ws:
pending_repl.add(ws)
self.workers_memory[ws] += ts.nbytes

elif cmd == "drop":
ws = self._find_dropper(ts, candidates, pending_drop)
if ws:
pending_drop.add(ws)
self.workers_memory[ws] = max(
0, self.workers_memory[ws] - ts.nbytes
)

else:
raise ValueError(f"Unknown command: {cmd}") # pragma: nocover

def _find_recipient(
self,
ts: TaskState,
candidates: Optional[set[WorkerState]],
pending_repl: set[WorkerState],
) -> Optional[WorkerState]:
"""Choose a worker to acquire a new replica of an-in-memory task among a set of
crusaderky marked this conversation as resolved.
Show resolved Hide resolved
candidates. If candidates is None, default to all workers in the cluster that do
not hold a replica yet. The worker with the lowest memory usage (downstream of
pending replications and drops) will be returned.
"""
if ts.state != "memory":
return None
if candidates is None:
candidates = set(self.scheduler.workers.values())
candidates -= ts.who_has
candidates -= pending_repl
if not candidates:
return None
fjetter marked this conversation as resolved.
Show resolved Hide resolved
# id(ws) is there just to prevent WorkerState objects to be compared in the
# unlikely event that they have exactly the same amount of bytes allocated.
crusaderky marked this conversation as resolved.
Show resolved Hide resolved
_, _, ws = min((self.workers_memory[ws], id(ws), ws) for ws in candidates)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on making a couple of heaps for workers for memory usage (both min and max ordered)?

We could do this once at the beginning of a cycle and then use that collection to help order our actions. This might be a way of taking some of the same tricks that you built up in the replicate work, but applying them more generally across all policies. I suspect that by doing this once per cycle over workers and using these data structures consistently that we might be able to reduce the overhead for lots of different operations.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because you need to pick the min/max worker from a shortlist, which can be arbitrary. For example, a user may want to replicate a GPU-related key exclusively to GPU-enabled workers.
With heaps, you'd need to cycle through the heap until you hit an element of the shortlist - which risks not only negating the performance benefit but making the runtime substantially worse in some use cases.

Also, this operation is O(n), but in the case of dropping n is the number of workers that hold a replica - meaning, 2 or 3 in most cases. In the case of replica increase it can be most workers on the cluster, but since we need to do this full scan only for the keys that need to be replicated, I expect it to be fast enough not to care.

return ws

def _find_dropper(
self,
ts: TaskState,
candidates: Optional[set[WorkerState]],
pending_drop: set[WorkerState],
) -> Optional[WorkerState]:
"""Choose a worker to drop its replica of an in-memory task among a set of
candidates. If candidates is None, default to all workers in the cluster that
hold a replica. The worker with the highest memory usage (downstream of pending
replications and drops) will be returned.
"""
if len(ts.who_has) - len(pending_drop) < 2:
return None
if candidates is None:
candidates = ts.who_has.copy()
else:
candidates &= ts.who_has
candidates -= pending_drop
candidates -= {waiter_ts.processing_on for waiter_ts in ts.waiters}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is it possible for a task that's waiting on this one to also be processing? I believe it, I'm just not familiar with this case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, a dependent task is removed from waiters when it's finished processing:

>>> import time
>>> import distributed
>>> client = distributed.Client(n_workers=1)
>>> s = client.cluster.scheduler
>>> def f(x):
...     time.sleep(600)
>>> f1 = client.submit(lambda: 1, key="f1")
>>> f2 = client.submit(f, f1, key="f2")
>>> time.sleep(0.2)
>>> s.tasks["f1"].waiters
{<TaskState 'f2' processing>}
>>> s.tasks["f2"].processing_on
<WorkerState 'tcp://10.35.0.6:45913', name: 0, memory: 1, processing: 1>

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. And we aren't willing to release f1 until f2 has completed, even though in theory we could release it as soon as f2 has started (since it's already been passed the necessary data), at the cost of losing resilience to failures in f2?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the best case, if f2 is still running it's also holding a reference to the data of f1, so your RAM benefit from releasing f1 early is nil.
In the worst case, you get a race condition as f2 did not have the time to fetch its inputs yet (didn't check).
In all cases, you lose the ability to immediately rerun f2 on the same node.

if not candidates:
return None
# id(ws) is there just to prevent WorkerState objects to be compared in the
# unlikely event that they have exactly the same amount of bytes allocated.
_, _, ws = max((self.workers_memory[ws], id(ws), ws) for ws in candidates)
return ws


class ActiveMemoryManagerPolicy:
"""Abstract parent class"""

manager: ActiveMemoryManagerExtension

def __repr__(self) -> str:
return f"{self.__class__.__name__}()"

def run(
self,
) -> Generator[
tuple[str, TaskState, Optional[set[WorkerState]]],
Optional[WorkerState],
None,
]:
"""This method is invoked by the ActiveMemoryManager every few seconds, or
whenever the user invokes scheduler.amm_run_once().
It is an iterator that must emit any of the following:

- "replicate", <TaskState>, None
- "replicate", <TaskState>, {subset of potential workers to replicate to}
- "drop", <TaskState>, None
- "drop", <TaskState>, {subset of potential workers to drop from}

Each element yielded indicates the desire to create or destroy a single replica
of a key. If a subset of workers is not provided, it defaults to all workers on
the cluster. Either the ActiveMemoryManager or the Worker may later decide to
disregard the request, e.g. because it would delete the last copy of a key or
because the key is currently needed on that worker.

You may optionally retrieve which worker it was decided the key will be
replicated to or dropped from, as follows:

```python
choice = yield "replicate", ts, None
```

``choice`` is either a WorkerState or None; the latter is returned if the
ActiveMemoryManager chose to disregard the request.

The current pending (accepted) commands can be inspected on
``self.manager.pending``; this includes the commands previously yielded by this
same method.

The current memory usage on each worker, *downstream of all pending commands*,
can be inspected on ``self.manager.workers_memory``.
"""
raise NotImplementedError("Virtual method")


class ReduceReplicas(ActiveMemoryManagerPolicy):
"""Make sure that in-memory tasks are not replicated on more workers than desired;
drop the excess replicas.
"""

def run(self):
# TODO this is O(n) to the total number of in-memory tasks on the cluster; it
# could be made faster by automatically attaching it to a TaskState when it
# goes above one replica and detaching it when it drops below two.
for ts in self.manager.scheduler.tasks.values():
if len(ts.who_has) < 2:
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid lots of work here I wonder if we might ...

  1. Iterate through workers that are saturated first, and then through their tasks
  2. Stop iterating once we get to workers that have enough space on them
  3. Short circuit this entire process if there hasn't been a transition since last time (there is a transition_counter on the scheduler). This should help us avoid activity in an idle cluster.

Copy link
Collaborator Author

@crusaderky crusaderky Jul 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Iterate through workers that are saturated first, and then through their tasks
  2. Stop iterating once we get to workers that have enough space on them

It is possible. Or, better in my opinion, track the tasks with 2+ replicas and iterate only on them; stop tracking them when they go back down to 1 replica. This will require a (straightforward) hook on changes to TaskState.who_has.
Either way I'd prefer leaving this to a following PR.

Short circuit this entire process if there hasn't been a transition since last time

There is no transition when you increase or decrease the number of replicas, unless you change from 0 to 1 or from 1 to 0. We could change it introducing a degenerate transition from "memory" to "memory" whenever you add or remove a replica. This would however break all downstream SchedulerPlugin's that rely on the transition kwargs, and in general I suspect it may be painful to implement - ping @fjetter for opinion.
Regardless of complexity of the change, it seems uninteresting to me to reduce the CPU load on an idle cluster - when there is no CPU load to begin with.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the (memory, memory) transition would cause quite some awkwardness and potential headache. Introducing another counter would be likely much simpler.

I do second the requirement for us to keep activity low when idle. We've been receiving some sparse user reports about this already (in this case workers are usually more annoying, still...) but I am fine with iterating on this on another PR


desired_replicas = 1 # TODO have a marker on TaskState
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already looking forward to this. Would it make sense for us to open a GH issue for this already?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During my latest chat with @mrocklin, he said he's uninterested in revamping the replicate() functionality and he'd rather deprecate it entirely for the time being and potentially reintroduce it on top of AMM at a later date.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, this is definitely something we should only build after AMM is done and is of relatively low priority. The usecase I have in mind is to allow for task annotations to replicate expensive but small intermediate results to increase resilience. That would allow "checkpointing" of computations.

A long time ago, I opened an issue sketching out some ideas around this. Just dropping the ref for now and we can move on for now #3184

ndrop = len(ts.who_has) - max(desired_replicas, len(ts.waiters))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is ndrop expected to become negative (if there are more waiters than workers holding the key)?

Relatedly, what if all those waiters are going to run on the same worker? Then len(ts.waiters) would over-count. Would we want something like len({tss.processing_on for tss in ts.waiters if tss.processing_on})?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, ndrop can totally become negative. list(range(-123)) -> []. Added clarification.
Good catch on the multiple tasks running on the same worker - I revised it now, see if you like it.

if ts in self.manager.pending:
pending_repl, pending_drop = self.manager.pending[ts]
ndrop += len(pending_repl) - len(pending_drop)
for _ in range(ndrop):
yield "drop", ts, None
25 changes: 25 additions & 0 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,31 @@ properties:
A list of trusted root modules the schedular is allowed to import (incl. submodules). For security reasons, the
scheduler does not import arbitrary Python modules.

active-memory-manager:
type: object
required: [start, interval, policies]
additionalProperties: false
properties:
start:
type: boolean
description: set to true to auto-start the AMM on Scheduler init;
false to manually start it with client.scheduler.amm_start()
interval:
type: string
description:
Time expression, e.g. "2s". Run the AMM cycle every <interval>.
policies:
type: array
items:
type: object
required: [class]
properties:
class:
type: string
description: fully qualified name of an ActiveMemoryManagerPolicy
subclass
additionalProperties:
description: keyword arguments to the policy constructor, if any

worker:
type: object
Expand Down
12 changes: 12 additions & 0 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ distributed:
- dask
- distributed

active-memory-manager:
# Set to true to auto-start the Active Memory Manager on Scheduler start; if false
# you'll have to either manually start it with client.scheduler.amm_start() or run
# it once with client.scheduler.amm_run().
start: false
# Once started, run the AMM cycle every <interval>
interval: 2s
policies:
# Policies that should be executed at every cycle. Any additional keys in each
# object are passed as keyword arguments to the policy constructor.
- class: distributed.active_memory_manager.ReduceReplicas

worker:
blocked-handlers: []
multiprocessing-method: spawn
Expand Down
Loading