Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle all threads stopped correctly #849

Merged
merged 6 commits into from
Feb 1, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ipykernel/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
class ControlThread(Thread):

def __init__(self, **kwargs):
Thread.__init__(self, **kwargs)
Thread.__init__(self, name="Control", **kwargs)
self.io_loop = IOLoop(make_current=False)
self.pydev_do_not_trace = True
self.is_pydev_daemon_thread = True

def run(self):
self.name="Control"
blink1073 marked this conversation as resolved.
Show resolved Hide resolved
self.io_loop.make_current()
try:
self.io_loop.start()
Expand Down
54 changes: 39 additions & 15 deletions ipykernel/debugger.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ def __init__(self, log, debugpy_stream, event_callback, shell_socket, session):
self.session = session
self.is_started = False
self.event_callback = event_callback
self.stopped_queue = Queue()

self.started_debug_handlers = {}
for msg_type in Debugger.started_debug_msg_types:
Expand All @@ -300,22 +301,19 @@ def __init__(self, log, debugpy_stream, event_callback, shell_socket, session):

def _handle_event(self, msg):
if msg['event'] == 'stopped':
self.stopped_threads.add(msg['body']['threadId'])
if msg['body']['allThreadsStopped']:
self.stopped_queue.put_nowait(msg)
# Do not forward the event now, will be done in the handle_stopped_event
return
else:
self.stopped_threads.add(msg['body']['threadId'])
self.event_callback(msg)
elif msg['event'] == 'continued':
try:
if msg['allThreadsContinued']:
self.stopped_threads = set()
else:
self.stopped_threads.remove(msg['body']['threadId'])
except Exception:
# Workaround for debugpy/pydev not setting the correct threadId
# after a next request. Does not work if a the code executed on
# the shell spawns additional threads
if len(self.stopped_threads) == 1:
self.stopped_threads = set()
else:
raise Exception('threadId from continued event not in stopped threads set')
self.event_callback(msg)
if msg['body']['allThreadsContinued']:
self.stopped_threads = set()
else:
self.stopped_threads.remove(msg['body']['threadId'])
self.event_callback(msg)

async def _forward_message(self, msg):
return await self.debugpy_client.send_dap_request(msg)
Expand All @@ -334,6 +332,32 @@ def _build_variables_response(self, request, variables):
}
return reply

def _accept_stopped_thread(self, thread_name):
# TODO: identify Thread-2, Thread-3 and Thread-4. These are NOT
# Control, IOPub or Heartbeat threads
forbid_list = [
'IPythonHistorySavingThread',
'Thread-2',
'Thread-3',
'Thread-4'
]
return thread_name not in forbid_list

async def handle_stopped_event(self):
# Wait for a stopped event message in the stopped queue
# This message is used for triggering the 'threads' request
event = await self.stopped_queue.get()
req = {
'seq': event['seq'] + 1,
'type': 'request',
'command': 'threads'
}
rep = await self._forward_message(req)
for t in rep['body']['threads']:
if self._accept_stopped_thread(t['name']):
self.stopped_threads.add(t['id'])
self.event_callback(event)

@property
def tcp_client(self):
return self.debugpy_client
Expand Down
4 changes: 3 additions & 1 deletion ipykernel/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class Heartbeat(Thread):
def __init__(self, context, addr=None):
if addr is None:
addr = ('tcp', localhost(), 0)
Thread.__init__(self)
Thread.__init__(self, name="Heartbeat")
self.context = context
self.transport, self.ip, self.port = addr
self.original_port = self.port
Expand All @@ -42,6 +42,7 @@ def __init__(self, context, addr=None):
self.daemon = True
self.pydev_do_not_trace = True
self.is_pydev_daemon_thread = True
self.name="Heartbeat"
blink1073 marked this conversation as resolved.
Show resolved Hide resolved

def pick_port(self):
if self.transport == 'tcp':
Expand Down Expand Up @@ -89,6 +90,7 @@ def _bind_socket(self):
return

def run(self):
self.name="Heartbeat"
blink1073 marked this conversation as resolved.
Show resolved Hide resolved
self.socket = self.context.socket(zmq.ROUTER)
self.socket.linger = 1000
try:
Expand Down
4 changes: 3 additions & 1 deletion ipykernel/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,11 @@ def __init__(self, socket, pipe=False):
self._events = deque()
self._event_pipes = WeakSet()
self._setup_event_pipe()
self.thread = threading.Thread(target=self._thread_main)
self.thread = threading.Thread(target=self._thread_main, name="IOPub")
self.thread.daemon = True
self.thread.pydev_do_not_trace = True
self.thread.is_pydev_daemon_thread = True
self.thread.name="IOPub"
blink1073 marked this conversation as resolved.
Show resolved Hide resolved

def _thread_main(self):
"""The inner loop that's actually run in a thread"""
Expand Down Expand Up @@ -176,6 +177,7 @@ def _check_mp_mode(self):

def start(self):
"""Start the IOPub thread"""
self.thread.name="IOPub"
blink1073 marked this conversation as resolved.
Show resolved Hide resolved
self.thread.start()
# make sure we don't prevent process exit
# I'm not sure why setting daemon=True above isn't enough, but it doesn't appear to be.
Expand Down
6 changes: 6 additions & 0 deletions ipykernel/ipkernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,19 @@ def dispatch_debugpy(self, msg):
def banner(self):
return self.shell.banner

async def poll_stopped_queue(self):
while True:
await self.debugger.handle_stopped_event()

def start(self):
self.shell.exit_now = False
if self.debugpy_stream is None:
self.log.warning("debugpy_stream undefined, debugging will not be enabled")
else:
self.debugpy_stream.on_recv(self.dispatch_debugpy, copy=False)
super().start()
if self.debugpy_stream:
asyncio.run_coroutine_threadsafe(self.poll_stopped_queue(), self.control_thread.io_loop.asyncio_loop)

def set_parent(self, ident, parent, channel='shell'):
"""Overridden from parent to tell the display hook and output streams
Expand Down
8 changes: 7 additions & 1 deletion ipykernel/tests/test_debugger.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ def test_rich_inspect_at_breakpoint(kernel_with_debug):

f(2, 3)"""


r = wait_for_debug_request(kernel_with_debug, "dumpCell", {"code": code})
source = r["body"]["sourcePath"]

Expand All @@ -246,6 +247,11 @@ def test_rich_inspect_at_breakpoint(kernel_with_debug):

kernel_with_debug.execute(code)

# Wait for stop on breakpoint
msg = {"msg_type": "", "content": {}}
while msg.get('msg_type') != 'debug_event' or msg["content"].get("event") != "stopped":
msg = kernel_with_debug.get_iopub_msg(timeout=TIMEOUT)

stacks = wait_for_debug_request(kernel_with_debug, "stackTrace", {"threadId": 1})[
"body"
]["stackFrames"]
Expand Down Expand Up @@ -276,4 +282,4 @@ def test_rich_inspect_at_breakpoint(kernel_with_debug):
def test_convert_to_long_pathname():
if sys.platform == 'win32':
from ipykernel.compiler import _convert_to_long_pathname
_convert_to_long_pathname(__file__)
_convert_to_long_pathname(__file__)