From 11440092e73687921d41a9441964f92a1758f9e1 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Wed, 25 Jul 2018 17:51:40 -0700 Subject: [PATCH] cache result and get docstrings --- distributed/actor.py | 56 ++++++++++++++++++++++++++++++++- distributed/tests/test_actor.py | 2 ++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/distributed/actor.py b/distributed/actor.py index fc63fb6b238..40784499da8 100644 --- a/distributed/actor.py +++ b/distributed/actor.py @@ -10,6 +10,43 @@ class Actor(WrappedKey): + """ Controls an object on a remote worker + + An actor allows remote control of a stateful object living on a remote + worker. Method calls on this object trigger operations on the remote + object and return ActorFutures on which we can block to get results. + + Examples + -------- + >>> class Counter: + ... def __init__(self): + ... self.n = 0 + ... def increment(self): + ... self.n += 1 + ... return self.n + + >>> from dask.distributed import Client + >>> client = Client() + + You can create an actor by submitting a class with the keyword + ``actor=True``. + + >>> future = client.submit(Counter, actor=True) + >>> counter = future.result() + >>> counter + + + Calling methods on this object immediately returns deferred ``ActorFuture`` + objects. You can call ``.result()`` on these objects to block and get the + result of the function call. + + >>> future = counter.increment() + >>> future.result() + 1 + >>> future = counter.increment() + >>> future.result() + 2 + """ def __init__(self, cls, address, key, worker=None): self._cls = cls self._address = address @@ -133,6 +170,9 @@ def get_actor_attribute_from_worker(): class ProxyRPC(object): + """ + An rpc-like object that uses the scheduler's rpc to connect to a worker + """ def __init__(self, rpc, address): self.rpc = rpc self._address = address @@ -148,12 +188,26 @@ def func(**msg): class ActorFuture(object): + """ Future to an actor's method call + + Whenever you call a method on an Actor you get an ActorFuture immediately + while the computation happens in the background. You can call ``.result`` + to block and collect the full result + + See Also + -------- + Actor + """ def __init__(self, q, io_loop): self.q = q self.io_loop = io_loop def result(self, timeout=None): - return self.q.get(timeout=timeout) + try: + return self._cached_result + except AttributeError: + self._cached_result = self.q.get(timeout=timeout) + return self._cached_result def __repr__(self): return '' diff --git a/distributed/tests/test_actor.py b/distributed/tests/test_actor.py index d452a8b7731..bf90f48e524 100644 --- a/distributed/tests/test_actor.py +++ b/distributed/tests/test_actor.py @@ -249,6 +249,8 @@ def test_sync(loop): assert n == 1 assert counter.n == 1 + assert future.result() == future.result() + assert 'ActorFuture' in repr(future) assert 'distributed.actor' not in repr(future)