Skip to content

Commit

Permalink
Fixed a number of async issues and enhanced process cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
MSeal committed Mar 8, 2020
1 parent 507876f commit 83d18bf
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 28 deletions.
139 changes: 114 additions & 25 deletions nbclient/client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import datetime
import base64
import inspect
from textwrap import dedent

# For python 3.5 compatibility we import asynccontextmanager from async_generator instead of
# contextlib, and we `await yield_()` instead of just `yield`
from async_generator import asynccontextmanager, async_generator, yield_
from contextlib import contextmanager

from time import monotonic
from queue import Empty
Expand All @@ -15,7 +17,13 @@

from nbformat.v4 import output_from_msg

from .exceptions import CellTimeoutError, DeadKernelError, CellExecutionComplete, CellExecutionError
from .exceptions import (
CellControlSignal,
CellTimeoutError,
DeadKernelError,
CellExecutionComplete,
CellExecutionError
)


def timestamp():
Expand Down Expand Up @@ -308,7 +316,48 @@ def start_kernel_manager(self):
self.km.client_class = 'jupyter_client.asynchronous.AsyncKernelClient'
return self.km

async def start_new_kernel_client(self, **kwargs):
def _cleanup_kernel(self):
try:
self.kc.shutdown() # Send a polite shutdown request
loop = get_loop()
try:
# Queue the manager to kill the process, sometimes the built-in and above
# shutdowns have not been successful or called yet, so give a direct kill
# call here and recover gracefully if it's already dead.
try:
# For AsyncKernelManager
loop.run_until_complete(self.km.shutdown_kernel(now=True))
except TypeError:
self.km.shutdown_kernel(now=True)
except RuntimeError as e:
# The error isn't specialized, so we have to check the message
if 'No kernel is running!' not in str(e):
raise
finally:
# Remove any state left over even if we failed to stop the kernel
self.km.cleanup()
self.kc.stop_channels()
self.kc = None

def start_new_kernel_client(self, **kwargs):
"""Creates a new kernel client.
Parameters
----------
kwargs :
Any options for `self.kernel_manager_class.start_kernel()`. Because
that defaults to KernelManager, this will likely include options
accepted by `KernelManager.start_kernel()``, which includes `cwd`.
Returns
-------
kc : KernelClient
Kernel client as created by the kernel manager `km`.
"""
loop = get_loop()
return loop.run_until_complete(self.async_start_new_kernel_client(**kwargs))

async def async_start_new_kernel_client(self, **kwargs):
"""Creates a new kernel client.
Parameters
Expand All @@ -330,22 +379,47 @@ async def start_new_kernel_client(self, **kwargs):
if self.km.ipykernel and self.ipython_hist_file:
self.extra_arguments += ['--HistoryManager.hist_file={}'.format(self.ipython_hist_file)]

self.km.start_kernel(extra_arguments=self.extra_arguments, **kwargs)
# Support AsyncKernelManager
if inspect.iscoroutinefunction(self.km.start_kernel):
await self.km.start_kernel(extra_arguments=self.extra_arguments, **kwargs)
else:
self.km.start_kernel(extra_arguments=self.extra_arguments, **kwargs)

self.kc = self.km.client()
self.kc.start_channels()
try:
await self.kc.wait_for_ready(timeout=self.startup_timeout)
except RuntimeError:
self.kc.stop_channels()
self.km.shutdown_kernel()
self._cleanup_kernel()
raise
self.kc.allow_stdin = False
return self.kc

@contextmanager
def setup_kernel(self, **kwargs):
"""
Context manager for setting up the kernel to execute a notebook.
The assigns the Kernel Manager (`self.km`) if missing and Kernel Client(`self.kc`).
When control returns from the yield it stops the client's zmq channels, and shuts
down the kernel.
"""
# Can't use run_until_complete on an asynccontextmanager function :(
# yield from get_loop().run_until_complete(self.async_setup_kernel(**kwargs))
if self.km is None:
self.start_kernel_manager()

if not self.km.has_kernel:
self.start_new_kernel_client(**kwargs)
try:
yield
finally:
self._cleanup_kernel()

@asynccontextmanager
@async_generator # needed for python 3.5 compatibility
async def setup_kernel(self, **kwargs):
async def async_setup_kernel(self, **kwargs):
"""
Context manager for setting up the kernel to execute a notebook.
Expand All @@ -358,12 +432,11 @@ async def setup_kernel(self, **kwargs):
self.start_kernel_manager()

if not self.km.has_kernel:
await self.start_new_kernel_client(**kwargs)
await self.async_start_new_kernel_client(**kwargs)
try:
await yield_(None) # would just yield in python >3.5
finally:
self.kc.stop_channels()
self.kc = None
self._cleanup_kernel()

def execute(self, **kwargs):
"""
Expand All @@ -388,15 +461,15 @@ async def async_execute(self, **kwargs):
"""
self.reset_execution_trackers()

async with self.setup_kernel(**kwargs):
async with self.async_setup_kernel(**kwargs):
self.log.info("Executing notebook with kernel: %s" % self.kernel_name)
for index, cell in enumerate(self.nb.cells):
# Ignore `'execution_count' in content` as it's always 1
# when store_history is False
await self.async_execute_cell(
cell, index, execution_count=self.code_cells_executed + 1
)
info_msg = await self._wait_for_reply(self.kc.kernel_info())
info_msg = await self.async_wait_for_reply(self.kc.kernel_info())
self.nb.metadata['language_info'] = info_msg['content']['language_info']
self.set_widgets_metadata()

Expand Down Expand Up @@ -444,7 +517,7 @@ def _update_display_id(self, display_id, msg):
outputs[output_idx]['data'] = out['data']
outputs[output_idx]['metadata'] = out['metadata']

async def _poll_for_reply(self, msg_id, cell, timeout, task_poll_output_msg):
async def _async_poll_for_reply(self, msg_id, cell, timeout, task_poll_output_msg):
if timeout is not None:
deadline = monotonic() + timeout
while True:
Expand All @@ -469,9 +542,9 @@ async def _poll_for_reply(self, msg_id, cell, timeout, task_poll_output_msg):
except Empty:
# received no message, check if kernel is still alive
self._check_alive()
self._handle_timeout(timeout, cell)
await self._async_handle_timeout(timeout, cell)

async def _poll_output_msg(self, parent_msg_id, cell, cell_index):
async def _async_poll_output_msg(self, parent_msg_id, cell, cell_index):
while True:
msg = await self.kc.iopub_channel.get_msg(timeout=None)
if msg['parent_header'].get('msg_id') == parent_msg_id:
Expand All @@ -492,11 +565,14 @@ def _get_timeout(self, cell):

return timeout

def _handle_timeout(self, timeout, cell=None):
async def _async_handle_timeout(self, timeout, cell=None):
self.log.error("Timeout waiting for execute reply (%is)." % timeout)
if self.interrupt_on_timeout:
self.log.error("Interrupting kernel")
self.km.interrupt_kernel()
if inspect.iscoroutinefunction(self.km.interrupt_kernel):
await self.km.interrupt_kernel()
else:
self.km.interrupt_kernel()
else:
raise CellTimeoutError.error_from_timeout_and_cell(
"Cell execution timed out", timeout, cell
Expand All @@ -507,19 +583,25 @@ def _check_alive(self):
self.log.error("Kernel died while waiting for execute reply.")
raise DeadKernelError("Kernel died")

async def _wait_for_reply(self, msg_id, cell=None):
def _wait_for_reply(self, msg_id, cell=None):
# Needed by papermill until it updates to consume async_wait
loop = get_loop()
return loop.run_until_complete(
self.async_wait_for_reply(msg_id, cell=cell)
)

async def async_wait_for_reply(self, msg_id, cell=None):
# wait for finish, with timeout
timeout = self._get_timeout(cell)
cummulative_time = 0
self.shell_timeout_interval = 5
while True:
try:
msg = await self.kc.shell_channel.get_msg(timeout=self.shell_timeout_interval)
except Empty:
self._check_alive()
cummulative_time += self.shell_timeout_interval
if timeout and cummulative_time > timeout:
self._handle_timeout(timeout, cell)
await self._async_handle_timeout(timeout, cell)
break
else:
if msg['parent_header'].get('msg_id') == msg_id:
Expand Down Expand Up @@ -639,12 +721,19 @@ async def async_execute_cell(self, cell, cell_index, execution_count=None, store
cell.outputs = []
self.clear_before_next_output = False

task_poll_output_msg = asyncio.ensure_future(
self._poll_output_msg(parent_msg_id, cell, cell_index)
)
exec_reply = await self._poll_for_reply(
parent_msg_id, cell, exec_timeout, task_poll_output_msg
)
task_poll_output_msg = asyncio.ensure_future(self._async_poll_output_msg(parent_msg_id, cell, cell_index))
try:
exec_reply = await self._async_poll_for_reply(
parent_msg_id, cell, exec_timeout, task_poll_output_msg
)
except Exception as e:
# Best effort to cancel request if it hasn't been resolved
try:
# Check if the task_poll_output is doing the raising for us
if not isinstance(e, CellControlSignal):
task_poll_output_msg.cancel()
finally:
raise

if execution_count:
cell['execution_count'] = execution_count
Expand Down
15 changes: 12 additions & 3 deletions nbclient/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
class CellTimeoutError(TimeoutError):
class CellControlSignal(Exception):
"""
A custom exception used to indicate that the exception is used for cell
control actions (not the best model, but it's needed to cover existing
behvior without major refactors).
"""
pass


class CellTimeoutError(TimeoutError, CellControlSignal):
"""
A custom exception to capture when a cell has timed out during execution.
"""
Expand All @@ -21,7 +30,7 @@ class DeadKernelError(RuntimeError):
pass


class CellExecutionComplete(Exception):
class CellExecutionComplete(CellControlSignal):
"""
Used as a control signal for cell execution across execute_cell and
process_message function calls. Raised when all execution requests
Expand All @@ -32,7 +41,7 @@ class CellExecutionComplete(Exception):
pass


class CellExecutionError(Exception):
class CellExecutionError(CellControlSignal):
"""
Custom exception to propagate exceptions that are raised during
notebook execution to the caller. This is mostly useful when
Expand Down
10 changes: 10 additions & 0 deletions nbclient/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,16 @@ def get_time_from_str(s):
assert status_idle - cell_end < delta


def test_synchronous_setup_kernel():
nb = nbformat.v4.new_notebook()
executor = NotebookClient(nb)
with executor.setup_kernel():
# Prove it initalized client
assert executor.kc is not None
# Prove it removed the client (and hopefully cleaned up)
assert executor.kc is None


class TestExecute(NBClientTestsBase):
"""Contains test functions for execute.py"""

Expand Down

0 comments on commit 83d18bf

Please sign in to comment.