Skip to content

Commit

Permalink
cache result and get docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin committed Jul 26, 2018
1 parent 112bb36 commit 1144009
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 1 deletion.
56 changes: 55 additions & 1 deletion distributed/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,43 @@


class Actor(WrappedKey):
""" Controls an object on a remote worker
An actor allows remote control of a stateful object living on a remote
worker. Method calls on this object trigger operations on the remote
object and return ActorFutures on which we can block to get results.
Examples
--------
>>> class Counter:
... def __init__(self):
... self.n = 0
... def increment(self):
... self.n += 1
... return self.n
>>> from dask.distributed import Client
>>> client = Client()
You can create an actor by submitting a class with the keyword
``actor=True``.
>>> future = client.submit(Counter, actor=True)
>>> counter = future.result()
>>> counter
<Actor: Counter, key=Counter-1234abcd>
Calling methods on this object immediately returns deferred ``ActorFuture``
objects. You can call ``.result()`` on these objects to block and get the
result of the function call.
>>> future = counter.increment()
>>> future.result()
1
>>> future = counter.increment()
>>> future.result()
2
"""
def __init__(self, cls, address, key, worker=None):
self._cls = cls
self._address = address
Expand Down Expand Up @@ -133,6 +170,9 @@ def get_actor_attribute_from_worker():


class ProxyRPC(object):
"""
An rpc-like object that uses the scheduler's rpc to connect to a worker
"""
def __init__(self, rpc, address):
self.rpc = rpc
self._address = address
Expand All @@ -148,12 +188,26 @@ def func(**msg):


class ActorFuture(object):
""" Future to an actor's method call
Whenever you call a method on an Actor you get an ActorFuture immediately
while the computation happens in the background. You can call ``.result``
to block and collect the full result
See Also
--------
Actor
"""
def __init__(self, q, io_loop):
self.q = q
self.io_loop = io_loop

def result(self, timeout=None):
return self.q.get(timeout=timeout)
try:
return self._cached_result
except AttributeError:
self._cached_result = self.q.get(timeout=timeout)
return self._cached_result

def __repr__(self):
return '<ActorFuture>'
2 changes: 2 additions & 0 deletions distributed/tests/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ def test_sync(loop):
assert n == 1
assert counter.n == 1

assert future.result() == future.result()

assert 'ActorFuture' in repr(future)
assert 'distributed.actor' not in repr(future)

Expand Down

0 comments on commit 1144009

Please sign in to comment.