Skip to content

Commit

Permalink
Fix that thing where the first example in your docs is supposed to work
Browse files Browse the repository at this point in the history
Thanks to @salotz for pointing out that the first example in the docs
was broken. Though it's somewhat embarrassing this might also explain
the problem in #79 and certain issues in #59...

The solution here is to import the target RPC module using the its
unique basename and absolute filepath in the sub-actor that requires it.
Special handling for `__main__` and `__mp_main__` is needed since the
spawned subprocess will have no knowledge about these parent-
-state-specific module variables. Solution: map the modules name to the
respective module file basename in the child process since the module
variables will of course have different values in children.
  • Loading branch information
goodboy committed Jan 29, 2020
1 parent 7feef44 commit c479b86
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 20 deletions.
44 changes: 26 additions & 18 deletions tractor/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ async def _invoke(
actor._ongoing_rpc_tasks.set()


def _get_mod_abspath(module):
return os.path.abspath(module.__file__)


class Actor:
"""The fundamental concurrency primitive.
Expand Down Expand Up @@ -178,9 +182,13 @@ def __init__(
self.uid = (name, uid or str(uuid.uuid4()))

mods = {}
for path in rpc_module_paths or ():
mod = importlib.import_module(path)
mods[path] = mod.__file__
for name in rpc_module_paths or ():
mod = importlib.import_module(name)
suffix_index = mod.__file__.find('.py')
unique_modname = os.path.basename(mod.__file__[:suffix_index])
mods[unique_modname] = _get_mod_abspath(mod)
if mod.__name__ == '__main__' or mod.__name__ == '__mp_main__':
self._main_mod = unique_modname

self.rpc_module_paths = mods
self._mods: dict = {}
Expand Down Expand Up @@ -235,32 +243,35 @@ def load_modules(self) -> None:
code (if it exists).
"""
try:
for path, absfilepath in self.rpc_module_paths.items():
log.debug(f"Attempting to import {path}")
# spec = importlib.util.spec_from_file_location(
# path, absfilepath)
# mod = importlib.util.module_from_spec(spec)
for modname, absfilepath in self.rpc_module_paths.items():
sys.path.append(os.path.dirname(absfilepath))
log.debug(f"Attempting to import {modname}@{absfilepath}")
spec = importlib.util.spec_from_file_location(
modname, absfilepath)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
self._mods[modname] = mod

# XXX append the allowed module to the python path
# which should allow for relative (at least downward)
# imports. Seems to be the only that will work currently
# to get `trio-run-in-process` to import modules we "send
# it".
sys.path.append(os.path.dirname(absfilepath))
# spec.loader.exec_module(mod)
mod = importlib.import_module(path)
self._mods[path] = mod

# if self.name != 'arbiter':
# importlib.import_module('doggy')
# from celery.contrib import rdb; rdb.set_trace()
except ModuleNotFoundError:
# it is expected the corresponding `ModuleNotExposed` error
# will be raised later
log.error(f"Failed to import {path} in {self.name}")
log.error(f"Failed to import {modname} in {self.name}")
raise

def _get_rpc_func(self, ns, funcname):
if ns == '__main__' or ns == '__mp_main__':
# lookup the specific module in the child denoted
# as `__main__`/`__mp_main__` in the parent
ns = self._main_mod
try:
return getattr(self._mods[ns], funcname)
except KeyError as err:
Expand Down Expand Up @@ -640,8 +651,6 @@ async def _async_main(
# blocks here as expected until the channel server is
# killed (i.e. this actor is cancelled or signalled by the parent)
except Exception as err:
# if self.name == 'arbiter':
# import pdb; pdb.set_trace()
if not registered_with_arbiter:
log.exception(
f"Actor errored and failed to register with arbiter "
Expand All @@ -666,8 +675,6 @@ async def _async_main(
raise

finally:
# if self.name == 'arbiter':
# import pdb; pdb.set_trace()
if registered_with_arbiter:
await self._do_unreg(arbiter_addr)
# terminate actor once all it's peers (actors that connected
Expand Down Expand Up @@ -714,7 +721,8 @@ async def _serve_forever(
)
)
log.debug(
f"Started tcp server(s) on {[l.socket for l in listeners]}") # type: ignore
f"Started tcp server(s) on {[l.socket for l in listeners]}"
) # type: ignore
self._listeners.extend(listeners)
task_status.started()

Expand Down
1 change: 0 additions & 1 deletion tractor/_spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ async def new_proc(
# passed through to actor main
bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int],
begin_wait_phase: trio.Event,
use_trio_run_in_process: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None:
Expand Down
1 change: 0 additions & 1 deletion tractor/_trionics.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ async def start_actor(
self.errors,
bind_addr,
parent_addr,
nursery,
)

async def run_in_actor(
Expand Down

0 comments on commit c479b86

Please sign in to comment.