Skip to content

Commit

Permalink
Init-support for "multi homed" transports
Browse files Browse the repository at this point in the history
Since we'd like to eventually allow a diverse set of transport
(protocol) methods and stacks, and a multi-peer discovery system for
distributed actor-tree applications, this reworks all runtime internals
to support multi-homing for any given tree on a logical host. In other
words any actor can now bind its transport server (currently only
unsecured TCP + `msgspec`) to more then one address available in its
(linux) network namespace. Further, registry actors (now dubbed
"registars" instead of "arbiters") can also similarly bind to multiple
network addresses and provide discovery services to remote actors via
multiple addresses which can now be provided at runtime startup.

Deats:
- adjust `._runtime` internals to use a `list[tuple[str, int]]` (and
  thus pluralized) socket address sequence where applicable for transport
  server socket binds, now exposed via `Actor.accept_addrs`:
  - `Actor.__init__()` now takes a `registry_addrs: list`.
  - `Actor.is_arbiter` -> `.is_registrar`.
  - `._arb_addr` -> `._reg_addrs: list[tuple]`.
  - always reg and de-reg from all registrars in `async_main()`.
  - only set the global runtime var `'_root_mailbox'` to the loopback
    address since normally all in-tree processes should have access to
    it, right?
  - `._serve_forever()` task now takes `listen_sockaddrs: list[tuple]`
- make `open_root_actor()` take a `registry_addrs: list[tuple[str, int]]`
  and defaults when not passed.
- change `ActorNursery.start_..()` methods take `bind_addrs: list` and
  pass down through the spawning layer(s) via the parent-seed-msg.
- generalize all `._discovery()` APIs to accept `registry_addrs`-like
  inputs and move all relevant subsystems to adopt the "registry" style
  naming instead of "arbiter":
  - make `find_actor()` support batched concurrent portal queries over
    all provided input addresses using `.trionics.gather_contexts()` Bo
  - syntax: move to using `async with <tuples>` 3.9+ style chained
    @acms.
  - a general modernization of the code to a python 3.9+ style.
  - start deprecation and change to "registry" naming / semantics:
    - `._discovery.get_arbiter()` -> `.get_registry()`
  • Loading branch information
goodboy committed Sep 27, 2023
1 parent ee151b0 commit 3d0e955
Show file tree
Hide file tree
Showing 8 changed files with 512 additions and 248 deletions.
192 changes: 142 additions & 50 deletions tractor/_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""
Actor discovery API.
Discovery (protocols) API for automatic addressing and location
management of (service) actors.
"""
from __future__ import annotations
from typing import (
Optional,
Union,
AsyncGenerator,
TYPE_CHECKING,
)
from contextlib import asynccontextmanager as acm
import warnings

from .trionics import gather_contexts
from ._ipc import _connect_chan, Channel
from ._portal import (
Portal,
Expand All @@ -34,13 +37,19 @@
from ._state import current_actor, _runtime_vars


@acm
async def get_arbiter(
if TYPE_CHECKING:
from ._runtime import Actor


@acm
async def get_registry(
host: str,
port: int,

) -> AsyncGenerator[Union[Portal, LocalPortal], None]:
) -> AsyncGenerator[
Portal | LocalPortal | None,
None,
]:
'''
Return a portal instance connected to a local or remote
arbiter.
Expand All @@ -51,93 +60,160 @@ async def get_arbiter(
if not actor:
raise RuntimeError("No actor instance has been defined yet?")

if actor.is_arbiter:
if actor.is_registrar:
# we're already the arbiter
# (likely a re-entrant call from the arbiter actor)
yield LocalPortal(actor, Channel((host, port)))
yield LocalPortal(
actor,
Channel((host, port))
)
else:
async with _connect_chan(host, port) as chan:
async with (
_connect_chan(host, port) as chan,
open_portal(chan) as regstr_ptl,
):
yield regstr_ptl

async with open_portal(chan) as arb_portal:

yield arb_portal
# TODO: deprecate and remove _arbiter form
get_arbiter = get_registry


@acm
async def get_root(
**kwargs,
) -> AsyncGenerator[Portal, None]:

# TODO: rename mailbox to `_root_maddr` when we finally
# add and impl libp2p multi-addrs?
host, port = _runtime_vars['_root_mailbox']
assert host is not None

async with _connect_chan(host, port) as chan:
async with open_portal(chan, **kwargs) as portal:
yield portal
async with (
_connect_chan(host, port) as chan,
open_portal(chan, **kwargs) as portal,
):
yield portal


@acm
async def query_actor(
name: str,
arbiter_sockaddr: Optional[tuple[str, int]] = None,
arbiter_sockaddr: tuple[str, int] | None = None,
regaddr: tuple[str, int] | None = None,

) -> AsyncGenerator[tuple[str, int], None]:
) -> AsyncGenerator[
tuple[str, int] | None,
None,
]:
'''
Simple address lookup for a given actor name.
Make a transport address lookup for an actor name to a specific
registrar.
Returns the (socket) address or ``None``.
Returns the (socket) address or ``None`` if no entry under that
name exists for the given registrar listening @ `regaddr`.
'''
actor = current_actor()
async with get_arbiter(
*arbiter_sockaddr or actor._arb_addr
) as arb_portal:
actor: Actor = current_actor()
if (
name == 'registrar'
and actor.is_registrar
):
raise RuntimeError(
'The current actor IS the registry!?'
)

sockaddr = await arb_portal.run_from_ns(
if arbiter_sockaddr is not None:
warnings.warn(
'`tractor.query_actor(regaddr=<blah>)` is deprecated.\n'
'Use `registry_addrs: list[tuple]` instead!',
DeprecationWarning,
stacklevel=2,
)
regaddr: list[tuple[str, int]] = arbiter_sockaddr

regstr: Portal
async with get_registry(
*(regaddr or actor._reg_addrs[0])
) as regstr:

# TODO: return portals to all available actors - for now
# just the last one that registered
sockaddr: tuple[str, int] = await regstr.run_from_ns(
'self',
'find_actor',
name=name,
)

# TODO: return portals to all available actors - for now just
# the last one that registered
if name == 'arbiter' and actor.is_arbiter:
raise RuntimeError("The current actor is the arbiter")

yield sockaddr if sockaddr else None
yield sockaddr


@acm
async def find_actor(
name: str,
arbiter_sockaddr: tuple[str, int] | None = None
arbiter_sockaddr: tuple[str, int] | None = None,
registry_addrs: list[tuple[str, int]] | None = None,

) -> AsyncGenerator[Optional[Portal], None]:
only_first: bool = True,

) -> AsyncGenerator[
Portal | list[Portal] | None,
None,
]:
'''
Ask the arbiter to find actor(s) by name.
Returns a connected portal to the last registered matching actor
known to the arbiter.
'''
async with query_actor(
name=name,
arbiter_sockaddr=arbiter_sockaddr,
) as sockaddr:

if sockaddr:
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
if arbiter_sockaddr is not None:
warnings.warn(
'`tractor.find_actor(arbiter_sockaddr=<blah>)` is deprecated.\n'
'Use `registry_addrs: list[tuple]` instead!',
DeprecationWarning,
stacklevel=2,
)
registry_addrs: list[tuple[str, int]] = [arbiter_sockaddr]

@acm
async def maybe_open_portal_from_reg_addr(
addr: tuple[str, int],
):
async with query_actor(
name=name,
regaddr=addr,
) as sockaddr:
if sockaddr:
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
yield None

async with gather_contexts(
mngrs=list(
maybe_open_portal_from_reg_addr(addr)
for addr in registry_addrs
)
) as maybe_portals:
print(f'Portalz: {maybe_portals}')
if not maybe_portals:
yield None
return

portals: list[Portal] = list(maybe_portals)
if only_first:
yield portals[0]

else:
yield portals


@acm
async def wait_for_actor(
name: str,
arbiter_sockaddr: tuple[str, int] | None = None,
# registry_addr: tuple[str, int] | None = None,
registry_addr: tuple[str, int] | None = None,

) -> AsyncGenerator[Portal, None]:
'''
Expand All @@ -146,17 +222,33 @@ async def wait_for_actor(
A portal to the first registered actor is returned.
'''
actor = current_actor()

async with get_arbiter(
*arbiter_sockaddr or actor._arb_addr,
) as arb_portal:
sockaddrs = await arb_portal.run_from_ns(
actor: Actor = current_actor()

if arbiter_sockaddr is not None:
warnings.warn(
'`tractor.wait_for_actor(arbiter_sockaddr=<foo>)` is deprecated.\n'
'Use `registry_addr: tuple` instead!',
DeprecationWarning,
stacklevel=2,
)
registry_addr: list[tuple[str, int]] = [
arbiter_sockaddr,
]

# TODO: use `.trionics.gather_contexts()` like
# above in `find_actor()` as well?
async with get_registry(
*(registry_addr or actor._reg_addrs[0]), # first if not passed
) as reg_portal:
sockaddrs = await reg_portal.run_from_ns(
'self',
'wait_for_actor',
name=name,
)
sockaddr = sockaddrs[-1]

# get latest registered addr by default?
# TODO: offer multi-portal yields in multi-homed case?
sockaddr: tuple[str, int] = sockaddrs[-1]

async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
Expand Down
14 changes: 8 additions & 6 deletions tractor/_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@

def _mp_main(

actor: Actor, # type: ignore
accept_addr: tuple[str, int],
actor: Actor,
accept_addrs: list[tuple[str, int]],
forkserver_info: tuple[Any, Any, Any, Any, Any],
start_method: SpawnMethodKey,
parent_addr: tuple[str, int] | None = None,
Expand Down Expand Up @@ -77,8 +77,8 @@ def _mp_main(
log.debug(f"parent_addr is {parent_addr}")
trio_main = partial(
async_main,
actor,
accept_addr,
actor=actor,
accept_addrs=accept_addrs,
parent_addr=parent_addr
)
try:
Expand All @@ -96,7 +96,7 @@ def _mp_main(

def _trio_main(

actor: Actor, # type: ignore
actor: Actor,
*,
parent_addr: tuple[str, int] | None = None,
infect_asyncio: bool = False,
Expand Down Expand Up @@ -132,7 +132,9 @@ def _trio_main(
else:
trio.run(trio_main)
except KeyboardInterrupt:
log.cancel(f"Actor {actor.uid} received KBI")
log.cancel(
f'Actor@{actor.uid} received KBI'
)

finally:
log.info(f"Actor {actor.uid} terminated")
4 changes: 3 additions & 1 deletion tractor/_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,9 @@ def connected(self) -> bool:

@asynccontextmanager
async def _connect_chan(
host: str, port: int
host: str,
port: int

) -> typing.AsyncGenerator[Channel, None]:
'''
Create and connect a channel with disconnect on context manager
Expand Down
7 changes: 6 additions & 1 deletion tractor/_portal.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,12 @@ class LocalPortal:
actor: 'Actor' # type: ignore # noqa
channel: Channel

async def run_from_ns(self, ns: str, func_name: str, **kwargs) -> Any:
async def run_from_ns(
self,
ns: str,
func_name: str,
**kwargs,
) -> Any:
'''
Run a requested local function from a namespace path and
return it's result.
Expand Down
Loading

0 comments on commit 3d0e955

Please sign in to comment.