-
-
Notifications
You must be signed in to change notification settings - Fork 719
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
167 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
Actors | ||
====== | ||
|
||
.. note:: This is an experimental feature and is subject to change without notice | ||
.. note:: This is an advanced feature and may not be suitable for beginning users. | ||
It is rarely necessary for common workloads. | ||
|
||
Actors enable stateful computations within a Dask workflow. They are useful | ||
for some rare algorithms that require additional performance and are willing to | ||
sacrifice resilience. | ||
|
||
An actor is a pointer to a user-defined-object living on a remote worker. | ||
Anyone with that actor can call methods on that remote object. | ||
|
||
Example | ||
------- | ||
|
||
Here we create a simple ``Counter`` class, instantiate that class on one worker, | ||
and then call methods on that class remotely. | ||
|
||
.. code-block:: python | ||
class Counter: | ||
""" A simple class to manage an incrementing counter """ | ||
n = 0 | ||
def __init__(self): | ||
self.n = 0 | ||
def increment(self): | ||
self.n += 1 | ||
return self.n | ||
def add(self, x): | ||
self.n += x | ||
return self.n | ||
from dask.distributed import Client # Start a Dask Client | ||
client = Client() | ||
future = client.submit(Counter, actor=True) # Create a Counter on a worker | ||
counter = future.result() # Get back a pointer to that object | ||
counter | ||
# <Actor: Counter, key=Counter-1234abcd> | ||
future = counter.increment() # Call remote method | ||
future.result() # Get back result | ||
# 1 | ||
future = counter.add(10) # Call remote method | ||
future.result() # Get back result | ||
# 11 | ||
Motivation | ||
---------- | ||
|
||
Actors are motivated by some of the challenges of using pure task graphs. | ||
|
||
Normal Dask computations are composed of a graph of functions. | ||
This approach has a few limitations that are good for resilience, but can | ||
negatively affect performance: | ||
|
||
1. **State**: The functions should not mutate their inputs in-place or rely on | ||
global state. They should instead operate in a pure-functional manner, | ||
consuming inputs and producing separate outputs. | ||
2. **Central Overhead**: The execution location and order is determined by the | ||
centralized scheduler. Because the scheduler is involved in every decision | ||
it can sometimes create a central bottleneck. | ||
|
||
Some workloads may need to update state directly, or may involve more tiny | ||
tasks than the scheduler can handle (the scheduler can coordinate about 4000 | ||
tasks per second). | ||
|
||
Actors side-step both of these limitations: | ||
|
||
1. **State**: Actors can hold on to and mutate state. They are allowed to | ||
update their state in-place. | ||
2. **Overhead**: Operations on actors do not inform the central scheduler, and | ||
so do not contribute to the 4000 task/second overhead. They also avoid an | ||
extra network hop and so have lower latencies. | ||
|
||
Semantics | ||
--------- | ||
|
||
You create an actor by submitting a Class to run on a worker using normal Dask | ||
computation functions like ``submit``, ``map``, ``compute``, or ``persist``, | ||
and using the ``actors=`` keyword (or ``actor=`` on ``submit``). | ||
|
||
.. code-block:: python | ||
future = client.submit(Counter, actors=True) | ||
You can use all other keywords to these functions like ``workers=``, | ||
``resources=``, and so on to control where this actor ends up. | ||
|
||
This creates a Dask future as normal on which you can call ``.result()`` to get | ||
the Actor once it has successfully run on a worker. | ||
|
||
.. code-block:: python | ||
>>> counter = future.result() | ||
>>> counter | ||
<Actor: Counter, key=...> | ||
A ``Counter`` object has been instantiated on one of the workers, and this | ||
``Actor`` object serves as our proxy to it. It has the same methods and | ||
attributes. | ||
|
||
.. code-block:: python | ||
>>> dir(counter) | ||
['add', 'increment', 'n'] | ||
However accessing an attribute or calling a method will trigger a communication | ||
to the remote worker, operation on that worker in a separate thread pool, and | ||
then a communication of the result back. For attribute access these operations | ||
block and return when finished, for method calls they return an ``ActorFuture`` | ||
immediately. | ||
|
||
.. code-block:: python | ||
>>> counter.n # Blocks until finished | ||
0 | ||
>>> future = counter.increment() # Immediately returns an ActorFuture | ||
>>> future.result() # Block until finished and result arrives | ||
1 | ||
Execution and Performance | ||
------------------------- | ||
|
||
When you call a method on an actor, your arguments get serialized and sent | ||
to the worker that owns the actor's object. If you do this from a worker this | ||
communication is direct. If you do this from a Client then this will be direct | ||
if the Client has direct access to the workers (create a client with | ||
``Client(..., direct_to_workers=True)`` if direct connections are possible) or | ||
by proxying through the scheduler if direct connections from the client to the | ||
workers are not possible. | ||
|
||
The appropriate method of the Actor's object is then called in a separate | ||
thread, the result captured, and then sent back to the calling side. Currently | ||
workers have only a single thread for actors, but this may change in the | ||
future. | ||
|
||
The result is sent back immediately to the calling side, and is not stored on | ||
the worker with the actor. It is cached on the ``ActorFuture`` object. | ||
|
||
Worker operations currently have about 1ms of latency, on top of any network | ||
latency that may exist. | ||
|
||
|
||
Limitations | ||
----------- | ||
|
||
Actors offer advanced capabilities, but with some cost: | ||
|
||
1. **No Resilience:** No effort is made to make actor workloads resilient to | ||
worker failure. If the worker dies while holding an actor that actor is | ||
lost forever. | ||
2. **No Diagnostics:** Because the scheduler is not informed about actor | ||
computations no diagnostics are available about these computations. | ||
3. **No Load balancing:** Actors are allocated onto workers evenly, without | ||
serious consideration given to avoiding communication. | ||
4. **Experimental:** Actors are a new feature and subject to change without | ||
warning |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -104,6 +104,7 @@ Contents | |
:maxdepth: 1 | ||
:caption: Additional Features | ||
|
||
actors | ||
adaptive | ||
asynchronous | ||
configuration | ||
|