diff --git a/distributed/__init__.py b/distributed/__init__.py index 71e71a79143..ac324592dd2 100644 --- a/distributed/__init__.py +++ b/distributed/__init__.py @@ -2,6 +2,7 @@ from . import config from dask.config import config +from .actor import Actor, ActorFuture from .core import connect, rpc from .deploy import LocalCluster, Adaptive from .diagnostics import progress diff --git a/distributed/actor.py b/distributed/actor.py new file mode 100644 index 00000000000..b97c79a6041 --- /dev/null +++ b/distributed/actor.py @@ -0,0 +1,209 @@ +from tornado import gen +import functools + +from .client import Future, default_client +from .compatibility import get_thread_identity, Queue +from .protocol import to_serialize +from .utils import sync +from .utils_comm import WrappedKey +from .worker import get_worker + + +class Actor(WrappedKey): + """ Controls an object on a remote worker + + An actor allows remote control of a stateful object living on a remote + worker. Method calls on this object trigger operations on the remote + object and return ActorFutures on which we can block to get results. + + Examples + -------- + >>> class Counter: + ... def __init__(self): + ... self.n = 0 + ... def increment(self): + ... self.n += 1 + ... return self.n + + >>> from dask.distributed import Client + >>> client = Client() + + You can create an actor by submitting a class with the keyword + ``actor=True``. + + >>> future = client.submit(Counter, actor=True) + >>> counter = future.result() + >>> counter + + + Calling methods on this object immediately returns deferred ``ActorFuture`` + objects. You can call ``.result()`` on these objects to block and get the + result of the function call. + + >>> future = counter.increment() + >>> future.result() + 1 + >>> future = counter.increment() + >>> future.result() + 2 + """ + def __init__(self, cls, address, key, worker=None): + self._cls = cls + self._address = address + self.key = key + self._future = None + if worker: + self._worker = worker + self._client = None + else: + try: + self._worker = get_worker() + except ValueError: + self._worker = None + try: + self._client = default_client() + self._future = Future(key) + except ValueError: + self._client = None + + def __repr__(self): + return '' % (self._cls.__name__, self.key) + + def __reduce__(self): + return (Actor, (self._cls, self._address, self.key)) + + @property + def _io_loop(self): + if self._worker: + return self._worker.io_loop + else: + return self._client.io_loop + + @property + def _scheduler_rpc(self): + if self._worker: + return self._worker.scheduler + else: + return self._client.scheduler + + @property + def _worker_rpc(self): + if self._worker: + return self._worker.rpc(self._address) + else: + if self._client.direct_to_workers: + return self._client.rpc(self._address) + else: + return ProxyRPC(self._client.scheduler, self._address) + + @property + def _asynchronous(self): + if self._client: + return self._client.asynchronous + else: + return get_thread_identity() == self._worker.thread_id + + def _sync(self, func, *args, **kwargs): + if self._client: + return self._client.sync(func, *args, **kwargs) + else: + # TODO support sync operation by checking against thread ident of loop + return sync(self._worker.loop, func, *args, **kwargs) + + def __dir__(self): + o = set(dir(type(self))) + o.update(attr for attr in dir(self._cls) if not attr.startswith('_')) + return sorted(o) + + def __getattr__(self, key): + attr = getattr(self._cls, key) + + if self._future and not self._future.status == 'finished': + raise ValueError("Worker holding Actor was lost") + + if callable(attr): + @functools.wraps(attr) + def func(*args, **kwargs): + @gen.coroutine + def run_actor_function_on_worker(): + try: + result = yield self._worker_rpc.actor_execute( + function=key, + actor=self.key, + args=[to_serialize(arg) for arg in args], + kwargs={k: to_serialize(v) for k, v in kwargs.items()}, + ) + except OSError: + if self._future: + yield self._future + else: + raise OSError("Unable to contact Actor's worker") + raise gen.Return(result['result']) + + if self._asynchronous: + return run_actor_function_on_worker() + else: + # TODO: this mechanism is error prone + # we should endeavor to make dask's standard code work here + q = Queue() + + @gen.coroutine + def wait_then_add_to_queue(): + x = yield run_actor_function_on_worker() + q.put(x) + self._io_loop.add_callback(wait_then_add_to_queue) + + return ActorFuture(q, self._io_loop) + return func + + else: + @gen.coroutine + def get_actor_attribute_from_worker(): + x = yield self._worker_rpc.actor_attribute(attribute=key, actor=self.key) + raise gen.Return(x['result']) + + return self._sync(get_actor_attribute_from_worker) + + +class ProxyRPC(object): + """ + An rpc-like object that uses the scheduler's rpc to connect to a worker + """ + def __init__(self, rpc, address): + self.rpc = rpc + self._address = address + + def __getattr__(self, key): + @gen.coroutine + def func(**msg): + msg['op'] = key + result = yield self.rpc.proxy(worker=self._address, msg=msg) + raise gen.Return(result) + + return func + + +class ActorFuture(object): + """ Future to an actor's method call + + Whenever you call a method on an Actor you get an ActorFuture immediately + while the computation happens in the background. You can call ``.result`` + to block and collect the full result + + See Also + -------- + Actor + """ + def __init__(self, q, io_loop): + self.q = q + self.io_loop = io_loop + + def result(self, timeout=None): + try: + return self._cached_result + except AttributeError: + self._cached_result = self.q.get(timeout=timeout) + return self._cached_result + + def __repr__(self): + return '' diff --git a/distributed/client.py b/distributed/client.py index 6b5de8ae3d4..f136b91bcd6 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -401,19 +401,7 @@ def lose(self): self._get_event().clear() def set_error(self, exception, traceback): - if isinstance(exception, bytes): - try: - exception = loads(exception) - except TypeError: - exception = Exception("Undeserializable exception", exception) - if traceback: - if isinstance(traceback, bytes): - try: - traceback = loads(traceback) - except (TypeError, AttributeError): - traceback = None - else: - traceback = None + _, exception, traceback = clean_exception(exception, traceback) self.status = 'error' self.exception = exception @@ -482,6 +470,9 @@ class resembles executors in ``concurrent.futures`` but also allows name: string (optional) Gives the client a name that will be included in logs generated on the scheduler for matters relating to this client + direct_to_workers: bool (optional) + Can this client connect directly to workers or should it proxy through + the scheduler? heartbeat_interval: int Time in milliseconds between heartbeats to scheduler @@ -514,7 +505,7 @@ def __init__(self, address=None, loop=None, timeout=no_default, security=None, asynchronous=False, name=None, heartbeat_interval=None, serializers=None, deserializers=None, - extensions=DEFAULT_EXTENSIONS, + extensions=DEFAULT_EXTENSIONS, direct_to_workers=False, **kwargs): if timeout == no_default: timeout = dask.config.get('distributed.comm.timeouts.connect') @@ -544,6 +535,7 @@ def __init__(self, address=None, loop=None, timeout=no_default, if deserializers is None: deserializers = serializers self._deserializers = deserializers + self.direct_to_workers = direct_to_workers # Communication self.security = security or Security() @@ -1197,13 +1189,14 @@ def submit(self, func, *args, **kwargs): raise TypeError("First input to submit must be a callable function") key = kwargs.pop('key', None) - pure = kwargs.pop('pure', True) workers = kwargs.pop('workers', None) resources = kwargs.pop('resources', None) retries = kwargs.pop('retries', None) priority = kwargs.pop('priority', 0) fifo_timeout = kwargs.pop('fifo_timeout', '100ms') allow_other_workers = kwargs.pop('allow_other_workers', False) + actor = kwargs.pop('actor', kwargs.pop('actors', False)) + pure = kwargs.pop('pure', not actor) if allow_other_workers not in (True, False, None): raise TypeError("allow_other_workers= must be True or False") @@ -1242,7 +1235,8 @@ def submit(self, func, *args, **kwargs): user_priority=priority, resources={skey: resources} if resources else None, retries=retries, - fifo_timeout=fifo_timeout) + fifo_timeout=fifo_timeout, + actors=actor) logger.debug("Submit %s(...), %s", funcname(func), key) @@ -1324,13 +1318,14 @@ def map(self, func, *iterables, **kwargs): key = kwargs.pop('key', None) key = key or funcname(func) - pure = kwargs.pop('pure', True) workers = kwargs.pop('workers', None) retries = kwargs.pop('retries', None) resources = kwargs.pop('resources', None) user_priority = kwargs.pop('priority', 0) allow_other_workers = kwargs.pop('allow_other_workers', False) fifo_timeout = kwargs.pop('fifo_timeout', '100ms') + actor = kwargs.pop('actor', kwargs.pop('actors', False)) + pure = kwargs.pop('pure', not actor) if allow_other_workers and workers is None: raise ValueError("Only use allow_other_workers= if using workers=") @@ -1388,7 +1383,8 @@ def map(self, func, *iterables, **kwargs): resources=resources, retries=retries, user_priority=user_priority, - fifo_timeout=fifo_timeout) + fifo_timeout=fifo_timeout, + actors=actor) logger.debug("map(%s, ...)", funcname(func)) return [futures[tokey(k)] for k in keys] @@ -1408,6 +1404,8 @@ def _gather(self, futures, errors='raise', direct=None, local_worker=None): else: if w.scheduler.address == self.scheduler.address: direct = True + if direct is None: + direct = self.direct_to_workers @gen.coroutine def wait(k): @@ -1610,6 +1608,8 @@ def _scatter(self, data, workers=None, broadcast=False, direct=None, else: if w.scheduler.address == self.scheduler.address: direct = True + if direct is None: + direct = self.direct_to_workers if local_worker: # running within task local_worker.update_data(data=data, report=False) @@ -2073,7 +2073,7 @@ def run_coroutine(self, function, *args, **kwargs): def _graph_to_futures(self, dsk, keys, restrictions=None, loose_restrictions=None, priority=None, user_priority=0, resources=None, retries=None, - fifo_timeout=0): + fifo_timeout=0, actors=None): with self._lock: if resources: resources = self._expand_resources(resources, @@ -2083,6 +2083,9 @@ def _graph_to_futures(self, dsk, keys, restrictions=None, retries = self._expand_retries(retries, all_keys=itertools.chain(dsk, keys)) + if actors is not None and actors is not True and actors is not False: + actors = list(self._expand_key(actors)) + keyset = set(keys) flatkeys = list(map(tokey, keys)) futures = {key: Future(key, self, inform=False) for key in keyset} @@ -2137,7 +2140,8 @@ def _graph_to_futures(self, dsk, keys, restrictions=None, 'resources': resources, 'submitting_task': getattr(thread_state, 'key', None), 'retries': retries, - 'fifo_timeout': fifo_timeout}) + 'fifo_timeout': fifo_timeout, + 'actors': actors}) return futures def get(self, dsk, keys, restrictions=None, loose_restrictions=None, @@ -2257,7 +2261,8 @@ def normalize_collection(self, collection): def compute(self, collections, sync=False, optimize_graph=True, workers=None, allow_other_workers=False, resources=None, - retries=0, priority=0, fifo_timeout='60s', **kwargs): + retries=0, priority=0, fifo_timeout='60s', actors=None, + **kwargs): """ Compute dask collections on cluster Parameters @@ -2350,7 +2355,8 @@ def compute(self, collections, sync=False, optimize_graph=True, resources=resources, retries=retries, user_priority=priority, - fifo_timeout=fifo_timeout) + fifo_timeout=fifo_timeout, + actors=actors) i = 0 futures = [] @@ -2373,7 +2379,7 @@ def compute(self, collections, sync=False, optimize_graph=True, def persist(self, collections, optimize_graph=True, workers=None, allow_other_workers=None, resources=None, retries=None, - priority=0, fifo_timeout='60s', **kwargs): + priority=0, fifo_timeout='60s', actors=None, **kwargs): """ Persist dask collections on cluster Starts computation of the collection on the cluster in the background. @@ -2442,7 +2448,8 @@ def persist(self, collections, optimize_graph=True, workers=None, resources=resources, retries=retries, user_priority=priority, - fifo_timeout=fifo_timeout) + fifo_timeout=fifo_timeout, + actors=actors) postpersists = [c.__dask_postpersist__() for c in collections] result = [func({k: futures[k] for k in flatten(c.__dask_keys__())}, *args) diff --git a/distributed/core.py b/distributed/core.py index 05b901cf818..aa8b77984d7 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -233,7 +233,7 @@ def port(self): _, self._port = get_address_host_port(self.address) return self._port - def identity(self, comm): + def identity(self, comm=None): return {'type': type(self).__name__, 'id': self.id} def listen(self, port_or_addr=None, listen_args=None): @@ -851,9 +851,17 @@ def clean_exception(exception, traceback, **kwargs): error_message: create and serialize errors into message """ if isinstance(exception, bytes): - exception = protocol.pickle.loads(exception) + try: + exception = protocol.pickle.loads(exception) + except Exception: + exception = Exception(exception) + elif isinstance(exception, str): + exception = Exception(exception) if isinstance(traceback, bytes): - traceback = protocol.pickle.loads(traceback) + try: + traceback = protocol.pickle.loads(traceback) + except (TypeError, AttributeError): + traceback = None elif isinstance(traceback, string_types): traceback = None # happens if the traceback failed serializing return type(exception), exception, traceback diff --git a/distributed/scheduler.py b/distributed/scheduler.py index a2412068c7b..034ea3eeadb 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -182,6 +182,12 @@ class WorkerState(object): The last time we received a heartbeat from this worker, in local scheduler time. + .. attribute:: actors: {TaskState} + + A set of all TaskStates on this worker that are actors. This only + includes those actors whose state actually lives on this worker, not + actors to which this worker has a reference. + """ # XXX need a state field to signal active/removed? @@ -200,6 +206,7 @@ class WorkerState(object): 'used_resources', 'status', 'last_seen', + 'actors', ) def __init__(self, worker, ncores, memory_limit, name=None): @@ -214,6 +221,7 @@ def __init__(self, worker, ncores, memory_limit, name=None): self.resources = {} self.used_resources = {} self.last_seen = 0 + self.actors = set() self.info = {'name': name, 'memory_limit': memory_limit, @@ -453,10 +461,13 @@ class TaskState(object): into the "processing" state and be sent for execution to another connected worker. - """ + .. attribute: actor: bool + Whether or not this task is an Actor. + """ __slots__ = ( # === General description === + 'actor', # Key name 'key', # Key prefix (see key_split()) @@ -518,6 +529,7 @@ def __init__(self, key, run_spec): self.worker_restrictions = None self.resource_restrictions = None self.loose_restrictions = False + self.actor = None def get_nbytes(self): nbytes = self.nbytes @@ -903,6 +915,7 @@ def __init__( 'feed': self.feed, 'terminate': self.close, 'broadcast': self.broadcast, + 'proxy': self.proxy, 'ncores': self.get_ncores, 'has_what': self.get_has_what, 'who_has': self.get_who_has, @@ -1300,7 +1313,7 @@ def update_graph(self, client=None, tasks=None, keys=None, dependencies=None, restrictions=None, priority=None, loose_restrictions=None, resources=None, submitting_task=None, retries=None, user_priority=0, - fifo_timeout=0): + actors=None, fifo_timeout=0): """ Add new computations to the internal dask graph @@ -1400,6 +1413,12 @@ def update_graph(self, client=None, tasks=None, keys=None, if isinstance(user_priority, Number): user_priority = {k: user_priority for k in tasks} + # Add actors + if actors is True: + actors = list(keys) + for actor in actors or []: + self.tasks[actor].actor = True + priority = priority or dask.order.order(tasks) # TODO: define order wrt old graph if submitting_task: # sub-tasks get better priority than parent tasks @@ -1964,6 +1983,8 @@ def send_task_to_worker(self, worker, key): 'duration': self.get_task_duration(ts)} if ts.resource_restrictions: msg['resource_restrictions'] = ts.resource_restrictions + if ts.actor: + msg['actor'] = True deps = ts.dependencies if deps: @@ -2323,6 +2344,13 @@ def send_message(addr): raise Return(dict(zip(workers, results))) + @gen.coroutine + def proxy(self, comm=None, msg=None, worker=None, serializers=None): + """ Proxy a communication through the scheduler to some other worker """ + d = yield self.broadcast(comm=comm, msg=msg, workers=[worker], + serializers=serializers) + raise gen.Return(d[worker]) + @gen.coroutine def rebalance(self, comm=None, keys=None, workers=None): """ Rebalance keys so that each worker stores roughly equal bytes @@ -3198,6 +3226,9 @@ def transition_waiting_processing(self, key): self.check_idle_saturated(ws) self.n_tasks += 1 + if ts.actor: + ws.actors.add(ts) + # logger.debug("Send job to worker: %s, %s", worker, key) self.send_task_to_worker(worker, key) @@ -3338,6 +3369,14 @@ def transition_memory_released(self, key, safe=False): if safe: assert not ts.waiters + if ts.actor: + for ws in ts.who_has: + ws.actors.discard(ts) + if ts.who_wants: + ts.exception_blame = ts + ts.exception = "Worker holding Actor was lost" + return {ts.key: 'erred'} # don't try to recreate + recommendations = OrderedDict() for dts in ts.waiters: @@ -3501,6 +3540,10 @@ def transition_processing_erred(self, key, cause=None, exception=None, assert not ts.who_has assert not ts.waiting_on + if ts.actor: + ws = ts.processing_on + ws.actors.remove(ts) + self._remove_from_processing(ts) if exception is not None: @@ -3642,6 +3685,11 @@ def transition_memory_forgotten(self, key): assert 0, (ts,) recommendations = {} + + if ts.actor: + for ws in ts.who_has: + ws.actors.discard(ts) + self._propagate_forgotten(ts, recommendations) self.report_on_key(ts=ts) @@ -3983,7 +4031,11 @@ def worker_objective(self, ts, ws): if ws not in dts.who_has]) stack_time = ws.occupancy / ws.ncores start_time = comm_bytes / BANDWIDTH + stack_time - return (start_time, ws.nbytes) + + if ts.actor: + return (len(ws.actors), start_time, ws.nbytes) + else: + return (start_time, ws.nbytes) @gen.coroutine def get_profile(self, comm=None, workers=None, merge_workers=True, @@ -4153,8 +4205,11 @@ def decide_worker(ts, all_workers, valid_workers, objective): """ deps = ts.dependencies assert all(dts.who_has for dts in deps) - candidates = frequencies([ws for dts in deps - for ws in dts.who_has]) + if ts.actor: + candidates = all_workers + else: + candidates = frequencies([ws for dts in deps + for ws in dts.who_has]) if valid_workers is True: if not candidates: candidates = all_workers @@ -4231,6 +4286,21 @@ def validate_task_state(ts): assert ts in cs.wants_what, \ ("not in who_wants' wants_what", str(ts), str(cs), str(cs.wants_what)) + if ts.actor: + if ts.state == 'memory': + assert sum([ts in ws.actors for ws in ts.who_has]) == 1 + if ts.state == 'processing': + assert ts in ts.processing_on.actors + + +def validate_worker_state(ws): + for ts in ws.has_what: + assert ws in ts.who_has, \ + ("not in has_what' who_has", str(ws), str(ts), str(ts.who_has)) + + for ts in ws.actors: + assert ts.state in ('memory', 'processing') + def validate_state(tasks, workers, clients): """ @@ -4243,9 +4313,7 @@ def validate_state(tasks, workers, clients): validate_task_state(ts) for ws in workers.values(): - for ts in ws.has_what: - assert ws in ts.who_has, \ - ("not in has_what' who_has", str(ws), str(ts), str(ts.who_has)) + validate_worker_state(ws) for cs in clients.values(): for ts in cs.wants_what: diff --git a/distributed/tests/test_actor.py b/distributed/tests/test_actor.py new file mode 100644 index 00000000000..580b884b554 --- /dev/null +++ b/distributed/tests/test_actor.py @@ -0,0 +1,521 @@ +import operator +from time import sleep +from tornado import gen + +import pytest + +import dask +from distributed import Actor, ActorFuture, Client, Future, wait, Nanny +from distributed.utils_test import gen_cluster, cluster +from distributed.utils_test import loop # noqa: F401 +from distributed.metrics import time + + +class Counter(object): + n = 0 + + def __init__(self): + self.n = 0 + + def increment(self): + self.n += 1 + return self.n + + def add(self, x): + self.n += x + return self.n + + +class List(object): + L = [] + + def __init__(self, dummy=None): + self.L = [] + + def append(self, x): + self.L.append(x) + + +class ParameterServer(object): + def __init__(self): + self.data = {} + + def put(self, key, value): + self.data[key] = value + + def get(self, key): + return self.data[key] + + +@pytest.mark.parametrize('direct_to_workers', [True, False]) +def test_client_actions(direct_to_workers): + + @gen_cluster(client=True) + def test(c, s, a, b): + c = yield Client(s.address, asynchronous=True, + direct_to_workers=direct_to_workers) + + counter = c.submit(Counter, workers=[a.address], actor=True) + assert isinstance(counter, Future) + counter = yield counter + assert counter._address + assert hasattr(counter, 'increment') + assert hasattr(counter, 'add') + assert hasattr(counter, 'n') + + n = yield counter.n + assert n == 0 + + assert counter._address == a.address + + assert isinstance(a.actors[counter.key], Counter) + assert s.tasks[counter.key].actor + + yield [counter.increment(), counter.increment()] + + n = yield counter.n + assert n == 2 + + counter.add(10) + while (yield counter.n) != 10 + 2: + n = yield counter.n + yield gen.sleep(0.01) + + yield c.close() + + test() + + +@pytest.mark.parametrize('separate_thread', [False, True]) +def test_worker_actions(separate_thread): + + @gen_cluster(client=True) + def test(c, s, a, b): + counter = c.submit(Counter, workers=[a.address], actor=True) + a_address = a.address + + def f(counter): + start = counter.n + + assert type(counter) is Actor + assert counter._address == a_address + + future = counter.increment(separate_thread=separate_thread) + assert isinstance(future, ActorFuture) + assert "Future" in type(future).__name__ + end = future.result(timeout=1) + assert end > start + + futures = [c.submit(f, counter, pure=False) for _ in range(10)] + yield futures + + counter = yield counter + assert (yield counter.n) == 10 + + test() + + +@gen_cluster(client=True) +def test_Actor(c, s, a, b): + counter = yield c.submit(Counter, actor=True) + + assert counter._cls == Counter + + assert hasattr(counter, 'n') + assert hasattr(counter, 'increment') + assert hasattr(counter, 'add') + + assert not hasattr(counter, 'abc') + + +@pytest.mark.xfail(reason="Tornado can pass things out of order" + + "Should rely on sending small messages rather than rpc") +@gen_cluster(client=True) +def test_linear_access(c, s, a, b): + start = time() + future = c.submit(sleep, 0.2) + actor = c.submit(List, actor=True, dummy=future) + actor = yield actor + + for i in range(100): + actor.append(i) + + while True: + yield gen.sleep(0.1) + L = yield actor.L + if len(L) == 100: + break + + L = yield actor.L + stop = time() + assert L == tuple(range(100)) + + assert stop - start > 0.2 + + +@gen_cluster(client=True) +def test_exceptions_create(c, s, a, b): + class Foo(object): + x = 0 + + def __init__(self): + raise ValueError('bar') + + with pytest.raises(ValueError) as info: + future = yield c.submit(Foo, actor=True) + + assert "bar" in str(info.value) + + +@gen_cluster(client=True) +def test_exceptions_method(c, s, a, b): + class Foo(object): + def throw(self): + 1 / 0 + + foo = yield c.submit(Foo, actor=True) + with pytest.raises(ZeroDivisionError): + yield foo.throw() + + +@gen_cluster(client=True) +def test_gc(c, s, a, b): + actor = c.submit(Counter, actor=True) + yield wait(actor) + del actor + + while a.actors or b.actors: + yield gen.sleep(0.01) + + +@gen_cluster(client=True) +def test_track_dependencies(c, s, a, b): + actor = c.submit(Counter, actor=True) + yield wait(actor) + x = c.submit(sleep, 0.5) + y = c.submit(lambda x, y: x, x, actor) + del actor + + yield gen.sleep(0.3) + + assert a.actors or b.actors + + +@gen_cluster(client=True) +def test_future(c, s, a, b): + counter = c.submit(Counter, actor=True, workers=[a.address]) + assert isinstance(counter, Future) + yield wait(counter) + assert isinstance(a.actors[counter.key], Counter) + + counter = yield counter + assert isinstance(counter, Actor) + assert counter._address + + yield gen.sleep(0.1) + assert counter.key in c.futures # don't lose future + + +@gen_cluster(client=True) +def test_future_dependencies(c, s, a, b): + counter = c.submit(Counter, actor=True, workers=[a.address]) + + def f(a): + assert isinstance(a, Actor) + assert a._cls == Counter + + x = c.submit(f, counter, workers=[b.address]) + yield x + + assert {ts.key for ts in s.tasks[x.key].dependencies} == {counter.key} + assert {ts.key for ts in s.tasks[counter.key].dependents} == {x.key} + + y = c.submit(f, counter, workers=[a.address], pure=False) + yield y + + assert {ts.key for ts in s.tasks[y.key].dependencies} == {counter.key} + assert {ts.key for ts in s.tasks[counter.key].dependents} == {x.key, y.key} + + +def test_sync(loop): + with cluster() as (s, [a, b]): + with Client(s['address'], loop=loop) as c: + counter = c.submit(Counter, actor=True) + counter = counter.result() + + assert counter.n == 0 + + future = counter.increment() + n = future.result() + assert n == 1 + assert counter.n == 1 + + assert future.result() == future.result() + + assert 'ActorFuture' in repr(future) + assert 'distributed.actor' not in repr(future) + + +@gen_cluster(client=True, config={'distributed.comm.timeouts.connect': '1s'}) +def test_failed_worker(c, s, a, b): + future = c.submit(Counter, actor=True, workers=[a.address]) + yield wait(future) + counter = yield future + + yield a._close() + + with pytest.raises(Exception) as info: + yield counter.increment() + + assert "actor" in str(info.value).lower() + assert "worker" in str(info.value).lower() + assert "lost" in str(info.value).lower() + + +@gen_cluster(client=True) +def bench(c, s, a, b): + counter = yield c.submit(Counter, actor=True) + + for i in range(1000): + yield counter.increment() + + +@gen_cluster(client=True) +def test_numpy_roundtrip(c, s, a, b): + np = pytest.importorskip('numpy') + + server = yield c.submit(ParameterServer, actor=True) + + x = np.random.random(1000) + yield server.put('x', x) + + y = yield server.get('x') + + assert (x == y).all() + + +@gen_cluster(client=True) +def test_numpy_roundtrip_getattr(c, s, a, b): + np = pytest.importorskip('numpy') + + counter = yield c.submit(Counter, actor=True) + + x = np.random.random(1000) + + yield counter.add(x) + + y = yield counter.n + + assert (x == y).all() + + +@gen_cluster(client=True) +def test_repr(c, s, a, b): + counter = yield c.submit(Counter, actor=True) + + assert 'Counter' in repr(counter) + assert 'Actor' in repr(counter) + assert counter.key in repr(counter) + assert 'distributed.actor' not in repr(counter) + + +@gen_cluster(client=True) +def test_dir(c, s, a, b): + counter = yield c.submit(Counter, actor=True) + + d = set(dir(counter)) + + for attr in dir(Counter): + if not attr.startswith('_'): + assert attr in d + + +@gen_cluster(client=True) +def test_many_computations(c, s, a, b): + counter = yield c.submit(Counter, actor=True) + + def add(n, counter): + for i in range(n): + counter.increment().result() + + futures = c.map(add, range(10), counter=counter) + done = c.submit(lambda x: None, futures) + + while not done.done(): + assert len(s.processing) <= a.ncores + b.ncores + yield gen.sleep(0.01) + + yield done + + +@gen_cluster(client=True, ncores=[('127.0.0.1', 5)] * 2) +def test_thread_safety(c, s, a, b): + class Unsafe(object): + def __init__(self): + self.n = 0 + + def f(self): + assert self.n == 0 + self.n += 1 + + for i in range(20): + sleep(0.002) + assert self.n == 1 + self.n = 0 + + unsafe = yield c.submit(Unsafe, actor=True) + + futures = [unsafe.f() for i in range(10)] + yield futures + + +@gen_cluster(client=True) +def test_Actors_create_dependencies(c, s, a, b): + counter = yield c.submit(Counter, actor=True) + future = c.submit(lambda x: None, counter) + yield wait(future) + assert s.tasks[future.key].dependencies == {s.tasks[counter.key]} + + +@gen_cluster(client=True) +def test_load_balance(c, s, a, b): + class Foo(object): + def __init__(self, x): + pass + + b = c.submit(operator.mul, 'b', 1000000) + yield wait(b) + [ws] = s.tasks[b.key].who_has + + x = yield c.submit(Foo, b, actor=True) + y = yield c.submit(Foo, b, actor=True) + assert x.key != y.key # actors assumed not pure + + assert s.tasks[x.key].who_has == {ws} # first went to best match + assert s.tasks[x.key].who_has != s.tasks[y.key].who_has # second load balanced + + +@gen_cluster(client=True, ncores=[('127.0.0.1', 1)] * 5) +def test_load_balance_map(c, s, *workers): + class Foo(object): + def __init__(self, x, y=None): + pass + + b = c.submit(operator.mul, 'b', 1000000) + yield wait(b) + + actors = c.map(Foo, range(10), y=b, actor=True) + yield wait(actors) + + assert all(len(w.actors) == 2 for w in workers) + + +@gen_cluster(client=True, ncores=[('127.0.0.1', 1)] * 4, Worker=Nanny) +def bench_param_server(c, s, *workers): + import dask.array as da + import numpy as np + x = da.random.random((500000, 1000), chunks=(1000, 1000)) + x = x.persist() + yield wait(x) + + class ParameterServer: + data = None + + def __init__(self, n): + self.data = np.random.random(n) + + def update(self, x): + self.data += x + self.data /= 2 + + def get_data(self): + return self.data + + def f(block, ps=None): + start = time() + params = ps.get_data(separate_thread=False).result() + stop = time() + update = (block - params).mean(axis=0) + ps.update(update, separate_thread=False) + print(format_time(stop - start)) + return np.array([[stop - start]]) + + from distributed.utils import format_time + start = time() + ps = yield c.submit(ParameterServer, x.shape[1], actor=True) + y = x.map_blocks(f, ps=ps, dtype=x.dtype) + # result = yield c.compute(y.mean()) + yield wait(y.persist()) + end = time() + print(format_time(end - start)) + + +@gen_cluster(client=True) +def test_compute(c, s, a, b): + + @dask.delayed + def f(n, counter): + assert isinstance(counter, Actor) + for i in range(n): + counter.increment().result() + + @dask.delayed + def check(counter, blanks): + return counter.n + + counter = dask.delayed(Counter)() + values = [f(i, counter) for i in range(5)] + final = check(counter, values) + + result = yield c.compute(final, actors=counter) + assert result == 0 + 1 + 2 + 3 + 4 + + start = time() + while a.data or b.data or a.actors or b.actors: + yield gen.sleep(0.01) + assert time() < start + 2 + + +@gen_cluster(client=True, ncores=[('127.0.0.1', 1)], + config={'distributed.worker.profile.interval': '1ms'}) +def test_actors_in_profile(c, s, a): + class Sleeper(object): + def sleep(self, time): + sleep(time) + + sleeper = yield c.submit(Sleeper, actor=True) + + for i in range(5): + yield sleeper.sleep(0.200) + if (list(a.profile_recent['children'])[0].startswith('sleep') or + 'Sleeper.sleep' in a.profile_keys): + return + assert False, list(a.profile_keys) + + +@gen_cluster(client=True) +def test_waiter(c, s, a, b): + from tornado.locks import Event + + class Waiter(object): + def __init__(self): + self.event = Event() + + @gen.coroutine + def set(self): + self.event.set() + + @gen.coroutine + def wait(self): + yield self.event.wait() + + waiter = yield c.submit(Waiter, actor=True) + + futures = [waiter.wait() for i in range(5)] # way more than we have actor threads + + yield gen.sleep(0.1) + assert not any(future.done() for future in futures) + + yield waiter.set() + + yield futures diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 6ab903ffdee..0e099ccb678 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -2200,6 +2200,12 @@ def test_broadcast(loop): b['address']: {x.key, y.key}} +@gen_cluster(client=True) +def test_proxy(c, s, a, b): + msg = yield c.scheduler.proxy(msg={'op': 'identity'}, worker=a.address) + assert msg['id'] == a.identity()['id'] + + @gen_cluster(client=True) def test__cancel(c, s, a, b): x = c.submit(slowinc, 1) @@ -5440,5 +5446,27 @@ def test_no_threads_lingering(): assert threading.active_count() < 30, list(active.values()) +@gen_cluster() +def test_direct_async(s, a, b): + c = yield Client(s.address, asynchronous=True, direct_to_workers=True) + assert c.direct_to_workers + yield c.close() + + c = yield Client(s.address, asynchronous=True, direct_to_workers=False) + assert not c.direct_to_workers + yield c.close() + + +def test_direct_sync(loop): + with cluster() as (s, [a, b]): + with Client(s['address'], loop=loop) as c: + assert not c.direct_to_workers + + def f(): + return get_client().direct_to_workers + + assert c.submit(f).result() + + if sys.version_info >= (3, 5): from distributed.tests.py3_test_client import * # noqa F401 diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 77730fac85c..4ebfd9869c9 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -705,21 +705,6 @@ def test_stop_doing_unnecessary_work(c, s, a, b): @gen_cluster(client=True, ncores=[('127.0.0.1', 1)]) def test_priorities(c, s, w): - a = delayed(slowinc)(1, dask_key_name='a', delay=0.05) - b = delayed(slowinc)(2, dask_key_name='b', delay=0.05) - a1 = delayed(slowinc)(a, dask_key_name='a1', delay=0.05) - a2 = delayed(slowinc)(a1, dask_key_name='a2', delay=0.05) - b1 = delayed(slowinc)(b, dask_key_name='b1', delay=0.05) - - z = delayed(add)(a2, b1) - future = yield c.compute(z) - - log = [t for t in w.log if t[1] == 'executing' and t[2] == 'memory'] - assert [t[0] for t in log[:5]] == ['a', 'b', 'a1', 'b1', 'a2'] - - -@gen_cluster(client=True, ncores=[('127.0.0.1', 1)]) -def test_priorities_2(c, s, w): values = [] for i in range(10): a = delayed(slowinc)(i, dask_key_name='a-%d' % i, delay=0.01) diff --git a/distributed/utils.py b/distributed/utils.py index 6257a9ba83b..193005dbc83 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -1418,3 +1418,11 @@ def color_of(x, palette=palette): h = md5(str(x).encode()) n = int(h.hexdigest()[:8], 16) return palette[n % len(palette)] + + +def iscoroutinefunction(f): + if gen.is_coroutine_function(f): + return True + if sys.version_info >= (3, 5) and inspect.iscoroutinefunction(f): + return True + return False diff --git a/distributed/utils_test.py b/distributed/utils_test.py index cce9ad75dda..10e5329f4d4 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -8,7 +8,6 @@ import gc from glob import glob import itertools -import inspect import logging import logging.config import os @@ -40,14 +39,14 @@ from tornado.ioloop import IOLoop from .client import default_client, _global_clients -from .compatibility import PY3, iscoroutinefunction, Empty, WINDOWS +from .compatibility import PY3, Empty, WINDOWS from .config import initialize_logging from .core import connect, rpc, CommClosedError from .metrics import time from .proctitle import enable_proctitle_on_children from .security import Security from .utils import (ignoring, log_errors, mp_context, get_ip, get_ipv6, - DequeHandler, reset_logger_locks, sync) + DequeHandler, reset_logger_locks, sync, iscoroutinefunction) from .worker import Worker, TOTAL_MEMORY, _global_workers try: @@ -719,12 +718,6 @@ def end_worker(w): s.stop() -def iscoroutinefunction(f): - if sys.version_info >= (3, 5) and inspect.iscoroutinefunction(f): - return True - return False - - def gen_cluster(ncores=[('127.0.0.1', 1), ('127.0.0.1', 2)], scheduler='127.0.0.1', timeout=10, security=None, Worker=Worker, client=False, scheduler_kwargs={}, diff --git a/distributed/worker.py b/distributed/worker.py index 48e4bb8a2e8..f614c8e0b34 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -47,7 +47,7 @@ ignoring, mp_context, import_file, silence_logging, thread_state, json_load_robust, key_split, format_bytes, DequeHandler, PeriodicCallback, - parse_bytes, parse_timedelta) + parse_bytes, parse_timedelta, iscoroutinefunction) from .utils_comm import pack_data, gather_from_workers from .utils_perf import ThrottledGC, enable_gc_diagnosis, disable_gc_diagnosis @@ -160,11 +160,13 @@ def __init__(self, scheduler_ip=None, scheduler_port=None, self.data = Buffer({}, storage, target, weight) else: self.data = dict() + self.actors = {} self.loop = loop or IOLoop.current() self.status = None self._closed = Event() self.reconnect = reconnect self.executor = executor or ThreadPoolExecutor(self.ncores) + self.actor_executor = ThreadPoolExecutor(1) self.name = name self.scheduler_delay = 0 self.stream_comms = dict() @@ -195,6 +197,8 @@ def __init__(self, scheduler_ip=None, scheduler_port=None, 'get_logs': self.get_logs, 'keys': self.keys, 'versions': self.versions, + 'actor_execute': self.actor_execute, + 'actor_attribute': self.actor_attribute, } stream_handlers = { @@ -450,7 +454,7 @@ def _start(self, addr_or_port=0): def start(self, port=0): self.loop.add_callback(self._start, port) - def identity(self, comm): + def identity(self, comm=None): return {'type': type(self).__name__, 'id': self.id, 'scheduler': self.scheduler.address, @@ -477,11 +481,13 @@ def _close(self, report=True, timeout=10, nanny=True, executor_wait=True): yield gen.with_timeout(timedelta(seconds=timeout), self.scheduler.unregister(address=self.contact_address)) self.scheduler.close_rpc() + self.actor_executor._work_queue.queue.clear() if isinstance(self.executor, ThreadPoolExecutor): self.executor._work_queue.queue.clear() self.executor.shutdown(wait=executor_wait, timeout=timeout) else: self.executor.shutdown(wait=False) + self.actor_executor.shutdown(wait=executor_wait, timeout=timeout) self._workdir.release() for k, v in self.services.items(): @@ -527,7 +533,8 @@ def wait_until_closed(self): assert self.status == 'closed' @gen.coroutine - def executor_submit(self, key, function, *args, **kwargs): + def executor_submit(self, key, function, args=(), kwargs=None, + executor=None): """ Safely run function in thread pool executor We've run into issues running concurrent.future futures within @@ -535,9 +542,11 @@ def executor_submit(self, key, function, *args, **kwargs): callbacks to ensure things run smoothly. This can get tricky, so we pull it off into an separate method. """ + executor = executor or self.executor job_counter[0] += 1 # logger.info("%s:%d Starts job %d, %s", self.ip, self.port, i, key) - future = self.executor.submit(function, *args, **kwargs) + kwargs = kwargs or {} + future = executor.submit(function, *args, **kwargs) pc = PeriodicCallback(lambda: logger.debug("future state: %s - %s", key, future._state), 1000) pc.start() @@ -558,6 +567,33 @@ def run_coroutine(self, comm, function, args=(), kwargs={}, wait=True): return run(self, comm, function=function, args=args, kwargs=kwargs, is_coro=True, wait=wait) + @gen.coroutine + def actor_execute(self, comm=None, actor=None, function=None, args=(), kwargs={}): + separate_thread = kwargs.pop('separate_thread', True) + key = actor + actor = self.actors[key] + func = getattr(actor, function) + name = key_split(key) + '.' + function + + if iscoroutinefunction(func): + result = yield func(*args, **kwargs) + elif separate_thread: + result = yield self.executor_submit(name, + apply_function_actor, + args=(func, args, kwargs, + self.execution_state, + name, + self.active_threads, + self.active_threads_lock), + executor=self.actor_executor) + else: + result = func(*args, **kwargs) + raise gen.Return({'status': 'OK', 'result': to_serialize(result)}) + + def actor_attribute(self, comm=None, actor=None, attribute=None): + value = getattr(self.actors[actor], attribute) + return {'status': 'OK', 'result': to_serialize(value)} + def update_data(self, comm=None, data=None, report=True, serializers=None): for key, value in data.items(): if key in self.task_state: @@ -618,6 +654,13 @@ def get_data(self, comm, keys=None, who=None, serializers=None, self.outgoing_current_count += 1 data = {k: self.data[k] for k in keys if k in self.data} + + if len(data) < len(keys): + for k in set(keys) - set(data): + if k in self.actors: + from .actor import Actor + data[k] = Actor(type(self.actors[k]), self.address, k) + msg = {'status': 'OK', 'data': {k: to_serialize(v) for k, v in data.items()}} nbytes = {k: self.nbytes.get(k) for k in data} @@ -625,6 +668,7 @@ def get_data(self, comm, keys=None, who=None, serializers=None, if self.digests is not None: self.digests['get-data-load-duration'].add(stop - start) start = time() + try: compressed = yield comm.write(msg, serializers=serializers) response = yield comm.read(deserializers=serializers) @@ -878,6 +922,30 @@ def apply_function(function, args, kwargs, execution_state, key, return msg +def apply_function_actor(function, args, kwargs, execution_state, key, + active_threads, active_threads_lock): + """ Run a function, collect information + + Returns + ------- + msg: dictionary with status, result/error, timings, etc.. + """ + ident = get_thread_identity() + + with active_threads_lock: + active_threads[ident] = key + + thread_state.execution_state = execution_state + thread_state.key = key + + result = function(*args, **kwargs) + + with active_threads_lock: + del active_threads[ident] + + return result + + def get_msg_safe_str(msg): """ Make a worker msg, which contains args and kwargs, safe to cast to str: allowing for some arguments to raise exceptions during conversion and @@ -1267,13 +1335,13 @@ def __repr__(self): def add_task(self, key, function=None, args=None, kwargs=None, task=None, who_has=None, nbytes=None, priority=None, duration=None, - resource_restrictions=None, **kwargs2): + resource_restrictions=None, actor=False, **kwargs2): try: if key in self.tasks: state = self.task_state[key] if state in ('memory', 'error'): if state == 'memory': - assert key in self.data + assert key in self.data or key in self.actors logger.debug("Asked to compute pre-existing result: %s: %s", key, state) self.send_task_state_to_scheduler(key) @@ -1298,6 +1366,8 @@ def add_task(self, key, function=None, args=None, kwargs=None, task=None, try: start = time() self.tasks[key] = _deserialize(function, args, kwargs, task) + if actor: + self.actors[key] = None stop = time() if stop - start > 0.010: @@ -1490,7 +1560,7 @@ def transition_waiting_ready(self, key): assert self.task_state[key] == 'waiting' assert key in self.waiting_for_data assert not self.waiting_for_data[key] - assert all(dep in self.data for dep in self.dependencies[key]) + assert all(dep in self.data or dep in self.actors for dep in self.dependencies[key]) assert key not in self.executing assert key not in self.ready @@ -1532,7 +1602,7 @@ def transition_ready_executing(self, key): # assert key not in self.data assert self.task_state[key] in READY assert key not in self.ready - assert all(dep in self.data for dep in self.dependencies[key]) + assert all(dep in self.data or dep in self.actors for dep in self.dependencies[key]) self.executing.add(key) self.loop.add_callback(self.execute, key) @@ -1712,9 +1782,14 @@ def ensure_communicating(self): raise def send_task_state_to_scheduler(self, key): - if key in self.data: - nbytes = self.nbytes[key] or sizeof(self.data[key]) - typ = self.types.get(key) or type(self.data[key]) + if key in self.data or self.actors.get(key): + try: + value = self.data[key] + except KeyError: + value = self.actors[key] + nbytes = self.nbytes[key] or sizeof(value) + typ = self.types.get(key) or type(value) + del value try: typ = dumps_function(typ) except PicklingError: @@ -1747,11 +1822,15 @@ def put_key_in_memory(self, key, value, transition=True): if key in self.data: return - start = time() - self.data[key] = value - stop = time() - if stop - start > 0.020: - self.startstops[key].append(('disk-write', start, stop)) + if key in self.actors: + self.actors[key] = value + + else: + start = time() + self.data[key] = value + stop = time() + if stop - start > 0.020: + self.startstops[key].append(('disk-write', start, stop)) if key not in self.nbytes: self.nbytes[key] = sizeof(value) @@ -2012,6 +2091,10 @@ def release_key(self, key, cause=None, reason=None, report=True): exc_info=True) del self.nbytes[key] del self.types[key] + if key in self.actors and key not in self.dep_state: + del self.actors[key] + del self.nbytes[key] + del self.types[key] if key in self.waiting_for_data: del self.waiting_for_data[key] @@ -2075,6 +2158,9 @@ def release_dep(self, dep, report=False): if dep in self.data: del self.data[dep] del self.types[dep] + if dep in self.actors: + del self.actors[dep] + del self.types[dep] del self.nbytes[dep] if dep in self.in_flight_tasks: @@ -2176,7 +2262,13 @@ def execute(self, key, report=False): function, args, kwargs = self.tasks[key] start = time() - data = {k: self.data[k] for k in self.dependencies[key]} + data = {} + for k in self.dependencies[key]: + try: + data[k] = self.data[k] + except KeyError: + from .actor import Actor # TODO: create local actor + data[k] = Actor(type(self.actors[k]), self.address, k, self) args2 = pack_data(args, data, key_types=(bytes, unicode)) kwargs2 = pack_data(kwargs, data, key_types=(bytes, unicode)) stop = time() @@ -2187,12 +2279,12 @@ def execute(self, key, report=False): logger.debug("Execute key: %s worker: %s", key, self.address) # TODO: comment out? try: - result = yield self.executor_submit(key, apply_function, function, - args2, kwargs2, - self.execution_state, key, - self.active_threads, - self.active_threads_lock, - self.scheduler_delay) + result = yield self.executor_submit(key, apply_function, + args=(function, args2, kwargs2, + self.execution_state, key, + self.active_threads, + self.active_threads_lock, + self.scheduler_delay)) except RuntimeError as e: executor_error = e raise @@ -2353,9 +2445,9 @@ def trigger_profile(self): if frame is not None: key = key_split(active_threads[ident]) profile.process(frame, None, self.profile_recent, - stop='_concurrent_futures_thread.py') + stop='distributed/worker.py') profile.process(frame, None, self.profile_keys[key], - stop='_concurrent_futures_thread.py') + stop='distributed/worker.py') stop = time() if self.digests is not None: self.digests['profile-duration'].add(stop - start) @@ -2438,7 +2530,7 @@ def get_logs(self, comm=None, n=None): return [(msg.levelname, deque_handler.format(msg)) for msg in L] def validate_key_memory(self, key): - assert key in self.data + assert key in self.data or key in self.actors assert key in self.nbytes assert key not in self.waiting_for_data assert key not in self.executing @@ -2450,14 +2542,14 @@ def validate_key_executing(self, key): assert key in self.executing assert key not in self.data assert key not in self.waiting_for_data - assert all(dep in self.data for dep in self.dependencies[key]) + assert all(dep in self.data or dep in self.actors for dep in self.dependencies[key]) def validate_key_ready(self, key): assert key in pluck(1, self.ready) assert key not in self.data assert key not in self.executing assert key not in self.waiting_for_data - assert all(dep in self.data for dep in self.dependencies[key]) + assert all(dep in self.data or dep in self.actors for dep in self.dependencies[key]) def validate_key_waiting(self, key): assert key not in self.data @@ -2495,7 +2587,7 @@ def validate_dep_flight(self, dep): assert dep in self.in_flight_workers[peer] def validate_dep_memory(self, dep): - assert dep in self.data + assert dep in self.data or dep in self.actors assert dep in self.nbytes assert dep in self.types if dep in self.task_state: @@ -2548,7 +2640,7 @@ def validate_state(self): if self.task_state[key] == 'memory': assert isinstance(self.nbytes[key], int) assert key not in self.waiting_for_data - assert key in self.data + assert key in self.data or key in self.actors except Exception as e: logger.exception(e) @@ -2605,6 +2697,7 @@ def _get_client(self, timeout=3): security=self.security, set_as_default=True, asynchronous=asynchronous, + direct_to_workers=True, name='worker', timeout=timeout) if not asynchronous: diff --git a/docs/source/actors.rst b/docs/source/actors.rst new file mode 100644 index 00000000000..109b9d907aa --- /dev/null +++ b/docs/source/actors.rst @@ -0,0 +1,235 @@ +Actors +====== + +.. note:: This is an experimental feature and is subject to change without notice +.. note:: This is an advanced feature and may not be suitable for beginning users. + It is rarely necessary for common workloads. + +Actors enable stateful computations within a Dask workflow. They are useful +for some rare algorithms that require additional performance and are willing to +sacrifice resilience. + +An actor is a pointer to a user-defined-object living on a remote worker. +Anyone with that actor can call methods on that remote object. + +Example +------- + +Here we create a simple ``Counter`` class, instantiate that class on one worker, +and then call methods on that class remotely. + +.. code-block:: python + + class Counter: + """ A simple class to manage an incrementing counter """ + n = 0 + + def __init__(self): + self.n = 0 + + def increment(self): + self.n += 1 + return self.n + + def add(self, x): + self.n += x + return self.n + + from dask.distributed import Client # Start a Dask Client + client = Client() + + future = client.submit(Counter, actor=True) # Create a Counter on a worker + counter = future.result() # Get back a pointer to that object + + counter + # + + future = counter.increment() # Call remote method + future.result() # Get back result + # 1 + + future = counter.add(10) # Call remote method + future.result() # Get back result + # 11 + +Motivation +---------- + +Actors are motivated by some of the challenges of using pure task graphs. + +Normal Dask computations are composed of a graph of functions. +This approach has a few limitations that are good for resilience, but can +negatively affect performance: + +1. **State**: The functions should not mutate their inputs in-place or rely on + global state. They should instead operate in a pure-functional manner, + consuming inputs and producing separate outputs. +2. **Central Overhead**: The execution location and order is determined by the + centralized scheduler. Because the scheduler is involved in every decision + it can sometimes create a central bottleneck. + +Some workloads may need to update state directly, or may involve more tiny +tasks than the scheduler can handle (the scheduler can coordinate about 4000 +tasks per second). + +Actors side-step both of these limitations: + +1. **State**: Actors can hold on to and mutate state. They are allowed to + update their state in-place. +2. **Overhead**: Operations on actors do not inform the central scheduler, and + so do not contribute to the 4000 task/second overhead. They also avoid an + extra network hop and so have lower latencies. + +Create an Actor +--------------- + +You create an actor by submitting a Class to run on a worker using normal Dask +computation functions like ``submit``, ``map``, ``compute``, or ``persist``, +and using the ``actors=`` keyword (or ``actor=`` on ``submit``). + +.. code-block:: python + + future = client.submit(Counter, actors=True) + +You can use all other keywords to these functions like ``workers=``, +``resources=``, and so on to control where this actor ends up. + +This creates a normal Dask future on which you can call ``.result()`` to get +the Actor once it has successfully run on a worker. + +.. code-block:: python + + >>> counter = future.result() + >>> counter + + +A ``Counter`` object has been instantiated on one of the workers, and this +``Actor`` object serves as our proxy to that remote object. It has the same +methods and attributes. + +.. code-block:: python + + >>> dir(counter) + ['add', 'increment', 'n'] + +Call Remote Methods +------------------- + +However accessing an attribute or calling a method will trigger a communication +to the remote worker, run the method on the remote worker in a separate thread +pool, and then communicate the result back to the calling side. For attribute +access these operations block and return when finished, for method calls they +return an ``ActorFuture`` immediately. + +.. code-block:: python + + >>> future = counter.increment() # Immediately returns an ActorFuture + >>> future.result() # Block until finished and result arrives + 1 + +``ActorFuture`` are similar to normal Dask ``Future`` objects, but not as fully +featured. They curently *only* support the ``result`` method and nothing else. +They don't currently work with any other Dask functions that expect futures, +like ``as_completed``, ``wait``, or ``client.gather``. They can't be placed +into additional submit or map calls to form dependencies. They communicate +their results immediately (rather than waiting for result to be called) and +cache the result on the future itself. + +Access Attributes +----------------- + +If you define an attribute at the class level then that attribute will be +accessible to the actor. + +.. code-block:: python + + class Counter: + n = 0 # Recall that we defined our class with `n` as a class variable + + ... + + >>> counter.n # Blocks until finished + 1 + +Attribute access blocks automatically. It's as though you called ``.result()``. + + +Execution on the Worker +----------------------- + +When you call a method on an actor, your arguments get serialized and sent +to the worker that owns the actor's object. If you do this from a worker this +communication is direct. If you do this from a Client then this will be direct +if the Client has direct access to the workers (create a client with +``Client(..., direct_to_workers=True)`` if direct connections are possible) or +by proxying through the scheduler if direct connections from the client to the +workers are not possible. + +The appropriate method of the Actor's object is then called in a separate +thread, the result captured, and then sent back to the calling side. Currently +workers have only a single thread for actors, but this may change in the +future. + +The result is sent back immediately to the calling side, and is not stored on +the worker with the actor. It is cached on the ``ActorFuture`` object. + + +Calling from coroutines and async/await +-------------------------- + +If you use actors within a coroutine or async/await function then actor methods +and attrbute access will return Tornado futures + +.. code-block:: python + + async def f(): + counter = await client.submit(Counter, actor=True) + + await counter.increment() + n = await counter.n + + +Coroutines and async/await on the Actor +--------------------------------------- + +If you define an ``async def`` function on the actor class then that method +will run on the Worker's event loop thread rather than a separate thread. + +.. code-block:: python + + def Waiter(object): + def __init__(self): + self.event = tornado.locks.Event() + + async def set(self): + self.event.set() + + async def wait(self): + await self.event.wait() + + waiter = client.submit(Waiter, actor=True).result() + waiter.wait().result() # waits until set, without consuming a worker thread + + +Performance +----------- + +Worker operations currently have about 1ms of latency, on top of any network +latency that may exist. However other activity in a worker may easily increase +these latencies if enough other activities are present. + + +Limitations +----------- + +Actors offer advanced capabilities, but with some cost: + +1. **No Resilience:** No effort is made to make actor workloads resilient to + worker failure. If the worker dies while holding an actor that actor is + lost forever. +2. **No Diagnostics:** Because the scheduler is not informed about actor + computations no diagnostics are available about these computations. +3. **No Load balancing:** Actors are allocated onto workers evenly, without + serious consideration given to avoiding communication. +4. **Experimental:** Actors are a new feature and subject to change without + warning diff --git a/docs/source/index.rst b/docs/source/index.rst index 2d1b6328aa6..211e94a4c3b 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -104,6 +104,7 @@ Contents :maxdepth: 1 :caption: Additional Features + actors adaptive asynchronous configuration