Skip to content

Commit

Permalink
Add multi-process debugging support using pdbpp
Browse files Browse the repository at this point in the history
This is the first step in addressing #113 and the initial support
of #130. Basically this allows (sub)processes to engage the `pdbpp`
debug machinery which read/writes the root actor's tty but only in
a FIFO semaphored way such that no two processes are using it
simultaneously. That means you can have multiple actors enter a trace or
crash and run the debugger in a sensible way without clobbering each
other's access to stdio. It required adding some "tear down hooks" to
a custom `pdbpp.Pdb` type such that we release a child's lock on the
parent on debugger exit (in this case when either of the "continue" or
"quit" commands are issued to the debugger console).

There's some code left commented in anticipation of full support for
issue #130 where we're need to actually capture and feed stdin to the
target (remote) actor which won't necessarily being running on the same
host.
  • Loading branch information
goodboy committed Jul 30, 2020
1 parent d752537 commit b942c91
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 60 deletions.
8 changes: 5 additions & 3 deletions debugging/mp_debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ async def main():
"""
async with tractor.open_nursery() as n:

# portal = await n.run_in_actor('future_self', bubble)
portal = await n.run_in_actor('future_self', bail)
portal1 = await n.run_in_actor('bubble', bubble)
portal = await n.run_in_actor('bail', bail)
# await portal.result()
# await portal1.result()

# The ``async with`` will unblock here since the 'some_linguist'
# actor has completed its main task ``cellar_door``.


if __name__ == '__main__':
tractor.run(main, loglevel='info', debug_mode=True)
tractor.run(main, loglevel='critical', debug_mode=True)
23 changes: 19 additions & 4 deletions tractor/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,14 @@ async def _invoke(
task_status.started(cs)
await chan.send({'return': await coro, 'cid': cid})
except (Exception, trio.MultiError) as err:
log.exception("Actor errored:")

# NOTE: don't enter debug mode recursively after quitting pdb
if _state.debug_mode() and not isinstance(err, bdb.BdbQuit):
# Allow for pdb control in parent
from ._debug import post_mortem
log.exception("Actor crashed, entering debug mode:")
await post_mortem()
else:
log.exception("Actor crashed:")

# always ship errors back to caller
err_msg = pack_error(err)
Expand Down Expand Up @@ -181,6 +182,7 @@ class Actor:

# Information about `__main__` from parent
_parent_main_data: Dict[str, str]
_parent_chan_cs: Optional[trio.CancelScope] = None

def __init__(
self,
Expand Down Expand Up @@ -639,8 +641,13 @@ async def _async_main(
# processing parent requests until our server is
# 100% up and running.
if self._parent_chan:
nursery.start_soon(
self._process_messages, self._parent_chan)
self._parent_chan_cs = await nursery.start(
partial(
self._process_messages,
self._parent_chan,
shield=True,
)
)

# Register with the arbiter if we're told its addr
log.debug(f"Registering {self} for role `{self.name}`")
Expand All @@ -656,6 +663,8 @@ async def _async_main(

# Blocks here as expected until the channel server is
# killed (i.e. this actor is cancelled or signalled by the parent)
self._parent_chan_cs.cancel()

except Exception as err:
if not registered_with_arbiter:
# TODO: I guess we could try to connect back
Expand Down Expand Up @@ -708,6 +717,10 @@ async def _async_main(
# or completed
self.cancel_server()

# teardown msg loop with parent
if self._parent_chan_cs:
self._parent_chan_cs.cancel()

async def _serve_forever(
self,
*,
Expand Down Expand Up @@ -770,6 +783,8 @@ async def cancel(self) -> None:
for n in root.child_nurseries:
n.cancel_scope.cancel()

self._parent_chan_cs.cancel()

async def _cancel_task(self, cid, chan):
"""Cancel a local task by call-id / channel.
Expand Down
174 changes: 124 additions & 50 deletions tractor/_debug.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
"""
Multi-core debugging for da peeps!
"""
import pdb
import sys
import tty
from functools import partial
from typing import Awaitable, Tuple

Expand All @@ -13,6 +11,16 @@

from .log import get_logger

try:
# wtf: only exported when installed in dev mode?
import pdbpp
except ImportError:
# pdbpp is installed in regular mode...
import pdb
assert pdb.xpm, "pdbpp is not installed?"
pdbpp = pdb


log = get_logger(__name__)


Expand All @@ -25,56 +33,96 @@
)


def subactoruid2proc(
actor: 'Actor', # noqa
uid: Tuple[str, str]
) -> trio.Process:
n = actor._actoruid2nursery[uid]
_, proc, _ = n._children[uid]
return proc
_pdb_release_hook = None


class PdbwTeardown(pdbpp.Pdb):
"""Add teardown hooks to the regular ``pdbpp.Pdb``.
"""
# TODO: figure out how to dissallow recursive .set_trace() entry
# since that'll cause deadlock for us.
def set_continue(self):
super().set_continue()
self.config.teardown(self)

def set_quit(self):
super().set_quit()
self.config.teardown(self)


class TractorConfig(pdbpp.DefaultConfig):
"""Custom ``pdbpp`` goodness.
"""
sticky_by_default = True

def teardown(self, _pdb):
_pdb_release_hook(_pdb)


# override the pdbpp config with our coolio one
pdbpp.Pdb.DefaultConfig = TractorConfig


# TODO: will be needed whenever we get to true remote debugging.
# XXX see https://github.com/goodboy/tractor/issues/130

# def subactoruid2proc(
# actor: 'Actor', # noqa
# uid: Tuple[str, str]
# ) -> trio.Process:
# n = actor._actoruid2nursery[uid]
# _, proc, _ = n._children[uid]
# return proc

# async def hijack_stdin():
# log.info(f"Hijacking stdin from {actor.uid}")

# trap std in and relay to subproc
# async_stdin = trio.wrap_file(sys.stdin)

# async with aclosing(async_stdin):
# async for msg in async_stdin:
# log.trace(f"Stdin input:\n{msg}")
# # encode to bytes
# bmsg = str.encode(msg)

# # relay bytes to subproc over pipe
# # await proc.stdin.send_all(bmsg)

# if bmsg in _pdb_exit_patterns:
# log.info("Closing stdin hijack")
# break


async def _hijack_stdin_relay_to_child(
subactor_uid: Tuple[str, str]
) -> None:
actor = tractor.current_actor()
proc = subactoruid2proc(actor, subactor_uid)

# nlb = []

async def hijack_stdin():
log.info(f"Hijacking stdin from {actor.uid}")
# try:
# # disable cooked mode
# fd = sys.stdin.fileno()
# old = tty.tcgetattr(fd)
# tty.setcbreak(fd)

# trap std in and relay to subproc
async_stdin = trio.wrap_file(sys.stdin)
debug_lock = actor.statespace.setdefault(
'_debug_lock', trio.StrictFIFOLock()
)

async with aclosing(async_stdin):
# while True:
async for msg in async_stdin:
log.trace(f"Stdin input:\n{msg}")
# nlb.append(msg)
# encode to bytes
bmsg = str.encode(msg)
log.debug(f"Actor {subactor_uid} is waiting on stdin hijack lock")
await debug_lock.acquire()
log.warning(f"Actor {subactor_uid} acquired stdin hijack lock")

# relay bytes to subproc over pipe
await proc.stdin.send_all(bmsg)
# TODO: when we get to true remote debugging
# this will deliver stdin data
try:
# indicate to child that we've locked stdio
yield 'Locked'

# line = str.encode(''.join(nlb))
# print(line)
# wait for cancellation of stream by child
await trio.sleep_forever()

if bmsg in _pdb_exit_patterns:
log.info("Closing stdin hijack")
break
# finally:
# tty.tcsetattr(fd, tty.TCSAFLUSH, old)
# TODO: for remote debugging schedule hijacking in root scope
# (see above)
# actor._root_nursery.start_soon(hijack_stdin)

# schedule hijacking in root scope
actor._root_nursery.start_soon(hijack_stdin)
finally:
if debug_lock.locked():
debug_lock.release()
log.debug(f"Actor {subactor_uid} released stdin hijack lock")


# XXX: We only make this sync in case someone wants to
Expand All @@ -84,35 +132,61 @@ def _breakpoint(debug_func) -> Awaitable[None]:
in subactors.
"""
actor = tractor.current_actor()
do_unlock = trio.Event()

async def wait_for_parent_stdin_hijack():
log.debug('Breakpoint engaged!')
async def wait_for_parent_stdin_hijack(
task_status=trio.TASK_STATUS_IGNORED
):

# TODO: need a more robust check for the "root" actor
if actor._parent_chan:
async with tractor._portal.open_portal(
actor._parent_chan,
start_msg_loop=False,
shield=True,
) as portal:
# with trio.fail_after(1):
await portal.run(
agen = await portal.run(
'tractor._debug',
'_hijack_stdin_relay_to_child',
subactor_uid=actor.uid,
)
async with aclosing(agen):
async for val in agen:
assert val == 'Locked'
task_status.started()
with trio.CancelScope(shield=True):
await do_unlock.wait()

# trigger cancellation of remote stream
break

log.debug(f"Child {actor} released parent stdio lock")

def unlock(_pdb):
do_unlock.set()

global _pdb_release_hook
_pdb_release_hook = unlock

async def _bp():
# this must be awaited by caller
await actor._root_nursery.start(
wait_for_parent_stdin_hijack
)

# block here one frame up where ``breakpoint()``
# was awaited and begin handling stdin
debug_func(actor)

# this must be awaited by caller
return wait_for_parent_stdin_hijack()
# return wait_for_parent_stdin_hijack()
return _bp()


def _set_trace(actor):
pdb.set_trace(
header=f"\nAttaching pdb to actor: {actor.uid}\n",
# start 2 levels up
log.critical(f"\nAttaching pdb to actor: {actor.uid}\n")
PdbwTeardown().set_trace(
# start 2 levels up in user code
frame=sys._getframe().f_back.f_back,
)

Expand All @@ -125,7 +199,7 @@ def _set_trace(actor):

def _post_mortem(actor):
log.error(f"\nAttaching to pdb in crashed actor: {actor.uid}\n")
pdb.post_mortem()
pdbpp.xpm(Pdb=PdbwTeardown)


post_mortem = partial(
Expand Down
7 changes: 5 additions & 2 deletions tractor/_portal.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@

@asynccontextmanager
async def maybe_open_nursery(
nursery: trio.Nursery = None
nursery: trio.Nursery = None,
shield: bool = False,
) -> typing.AsyncGenerator[trio.Nursery, Any]:
"""Create a new nursery if None provided.
Expand All @@ -32,6 +33,7 @@ async def maybe_open_nursery(
yield nursery
else:
async with trio.open_nursery() as nursery:
nursery.cancel_scope.shield = shield
yield nursery


Expand Down Expand Up @@ -316,6 +318,7 @@ async def open_portal(
channel: Channel,
nursery: Optional[trio.Nursery] = None,
start_msg_loop: bool = True,
shield: bool = False,
) -> typing.AsyncGenerator[Portal, None]:
"""Open a ``Portal`` through the provided ``channel``.
Expand All @@ -325,7 +328,7 @@ async def open_portal(
assert actor
was_connected = False

async with maybe_open_nursery(nursery) as nursery:
async with maybe_open_nursery(nursery, shield=shield) as nursery:
if not channel.connected():
await channel.connect()
was_connected = True
Expand Down
3 changes: 2 additions & 1 deletion tractor/_spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ async def cancel_on_completion(
else:
log.info(
f"Cancelling {portal.channel.uid} gracefully "
"after result {result}")
f"after result {result}"
)

# cancel the process now that we have a final result
await portal.cancel_actor()
Expand Down

0 comments on commit b942c91

Please sign in to comment.