Skip to content

Commit

Permalink
Avoid deadlock when two tasks are concurrently waiting for an unresol…
Browse files Browse the repository at this point in the history
…ved `ActorFuture` (#5709)

Thank you @graingert ! This looks great. Sorry for us taking so long with reviewing it.
  • Loading branch information
graingert authored Feb 18, 2022
1 parent c37e7fc commit b0dd9db
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 67 deletions.
2 changes: 1 addition & 1 deletion distributed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dask.config import config # type: ignore

from ._version import get_versions
from .actor import Actor, ActorFuture
from .actor import Actor, BaseActorFuture
from .client import (
Client,
CompatibleExecutor,
Expand Down
158 changes: 119 additions & 39 deletions distributed/actor.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,63 @@
from __future__ import annotations

import abc
import asyncio
import functools
import sys
import threading
from dataclasses import dataclass
from datetime import timedelta
from typing import Generic, Literal, NoReturn, TypeVar

from tornado.ioloop import IOLoop

from .client import Future
from .protocol import to_serialize
from .utils import iscoroutinefunction, sync, thread_state
from .utils_comm import WrappedKey
from .worker import get_client, get_worker

_T = TypeVar("_T")

if sys.version_info >= (3, 9):
from collections.abc import Awaitable, Generator
else:
from typing import Awaitable, Generator

if sys.version_info >= (3, 10):
from asyncio import Event as _LateLoopEvent
else:
# In python 3.10 asyncio.Lock and other primitives no longer support
# passing a loop kwarg to bind to a loop running in another thread
# e.g. calling from Client(asynchronous=False). Instead the loop is bound
# as late as possible: when calling any methods that wait on or wake
# Future instances. See: https://bugs.python.org/issue42392
class _LateLoopEvent:
def __init__(self) -> None:
self._event: asyncio.Event | None = None

def set(self) -> None:
if self._event is None:
self._event = asyncio.Event()

self._event.set()

def is_set(self) -> bool:
return self._event is not None and self._event.is_set()

async def wait(self) -> bool:
if self._event is None:
self._event = asyncio.Event()

return await self._event.wait()


class Actor(WrappedKey):
"""Controls an object on a remote worker
An actor allows remote control of a stateful object living on a remote
worker. Method calls on this object trigger operations on the remote
object and return ActorFutures on which we can block to get results.
object and return BaseActorFutures on which we can block to get results.
Examples
--------
Expand All @@ -36,7 +79,7 @@ class Actor(WrappedKey):
>>> counter
<Actor: Counter, key=Counter-1234abcd>
Calling methods on this object immediately returns deferred ``ActorFuture``
Calling methods on this object immediately returns deferred ``BaseActorFuture``
objects. You can call ``.result()`` on these objects to block and get the
result of the function call.
Expand Down Expand Up @@ -140,9 +183,7 @@ def __getattr__(self, key):
return attr

elif callable(attr):
return lambda *args, **kwargs: ActorFuture(
None, self._io_loop, result=attr(*args, **kwargs)
)
return lambda *args, **kwargs: EagerActorFuture(attr(*args, **kwargs))
else:
return attr

Expand All @@ -166,16 +207,17 @@ async def run_actor_function_on_worker():
return await run_actor_function_on_worker()
else: # pragma: no cover
raise OSError("Unable to contact Actor's worker")
return result
if result["status"] == "OK":
return _OK(result["result"])
return _Error(result["exception"])

q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
actor_future = ActorFuture(io_loop=self._io_loop)

async def wait_then_add_to_queue():
x = await run_actor_function_on_worker()
await q.put(x)
async def wait_then_set_result():
actor_future._set_result(await run_actor_function_on_worker())

self._io_loop.add_callback(wait_then_add_to_queue)
return ActorFuture(q, self._io_loop)
self._io_loop.add_callback(wait_then_set_result)
return actor_future

return func

Expand Down Expand Up @@ -215,10 +257,10 @@ async def func(**msg):
return func


class ActorFuture:
class BaseActorFuture(abc.ABC, Awaitable[_T]):
"""Future to an actor's method call
Whenever you call a method on an Actor you get an ActorFuture immediately
Whenever you call a method on an Actor you get a BaseActorFuture immediately
while the computation happens in the background. You can call ``.result``
to block and collect the full result
Expand All @@ -227,34 +269,72 @@ class ActorFuture:
Actor
"""

def __init__(self, q, io_loop, result=None):
self.q = q
self.io_loop = io_loop
if result:
self._cached_result = result
self.status = "pending"
@abc.abstractmethod
def result(self, timeout: str | timedelta | float | None = None) -> _T:
...

@abc.abstractmethod
def done(self) -> bool:
...

def __repr__(self) -> Literal["<ActorFuture>"]:
return "<ActorFuture>"


@dataclass(frozen=True, eq=False)
class EagerActorFuture(BaseActorFuture[_T]):
"""Future to an actor's method call when an actor calls another actor on the same worker"""

def __await__(self):
_result: _T

def __await__(self) -> Generator[object, None, _T]:
return self._result
yield

def result(self, timeout: object = None) -> _T:
return self._result

def done(self) -> Literal[True]:
return True


@dataclass(frozen=True, eq=False)
class _OK(Generic[_T]):
_v: _T

def unwrap(self) -> _T:
return self._v


@dataclass(frozen=True, eq=False)
class _Error:
_e: Exception

def unwrap(self) -> NoReturn:
raise self._e


class ActorFuture(BaseActorFuture[_T]):
def __init__(self, io_loop: IOLoop):
self._io_loop = io_loop
self._event = _LateLoopEvent()
self._out: _Error | _OK[_T] | None = None

def __await__(self) -> Generator[object, None, _T]:
return self._result().__await__()

def done(self):
return self.status != "pending"
def done(self) -> bool:
return self._event.is_set()

async def _result(self, raiseit=True):
if not hasattr(self, "_cached_result"):
out = await self.q.get()
if out["status"] == "OK":
self.status = "finished"
self._cached_result = out["result"]
else:
self.status = "error"
self._cached_result = out["exception"]
if self.status == "error":
raise self._cached_result
return self._cached_result
async def _result(self) -> _T:
await self._event.wait()
out = self._out
assert out is not None
return out.unwrap()

def result(self, timeout=None):
return sync(self.io_loop, self._result, callback_timeout=timeout)
def _set_result(self, out: _Error | _OK[_T]) -> None:
self._out = out
self._event.set()

def __repr__(self):
return "<ActorFuture>"
def result(self, timeout: str | timedelta | float | None = None) -> _T:
return sync(self._io_loop, self._result, callback_timeout=timeout)
4 changes: 2 additions & 2 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4966,11 +4966,11 @@ def update(self, futures):
"""Add multiple futures to the collection.
The added futures will emit from the iterator once they finish"""
from .actor import ActorFuture
from .actor import BaseActorFuture

with self.lock:
for f in futures:
if not isinstance(f, (Future, ActorFuture)):
if not isinstance(f, (Future, BaseActorFuture)):
raise TypeError("Input must be a future, got %s" % f)
self.futures[f] += 1
self.loop.add_callback(self._track_future, f)
Expand Down
Loading

0 comments on commit b0dd9db

Please sign in to comment.