Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] use new KernelClient.execute_interactive #360

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 33 additions & 78 deletions nbconvert/preprocessors/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,17 @@
import os
from textwrap import dedent

try:
from queue import Empty # Py 3
except ImportError:
from Queue import Empty # Py 2

from traitlets import List, Unicode, Bool, Enum, Any

from nbformat.v4 import output_from_msg
from .base import Preprocessor
from ..utils.exceptions import ConversionException
from traitlets import Integer

try:
TimeoutError
except NameError:
TimeoutError = RuntimeError

class CellExecutionError(ConversionException):
"""
Expand Down Expand Up @@ -212,86 +211,42 @@ def preprocess_cell(self, cell, resources, cell_index):


def run_cell(self, cell):
msg_id = self.kc.execute(cell.source)
self.log.debug("Executing cell:\n%s", cell.source)
# wait for finish, with timeout
while True:
try:
if self.timeout_func is not None:
timeout = self.timeout_func(cell)
else:
timeout = self.timeout

if not timeout or timeout < 0:
timeout = None
msg = self.kc.shell_channel.get_msg(timeout=timeout)
except Empty:
self.log.error(
"Timeout waiting for execute reply (%is)." % self.timeout)
if self.interrupt_on_timeout:
self.log.error("Interrupting kernel")
self.km.interrupt_kernel()
break
else:
try:
exception = TimeoutError
except NameError:
exception = RuntimeError
raise exception(
"Cell execution timed out, see log for details.")

if msg['parent_header'].get('msg_id') == msg_id:
break
else:
# not our reply
continue

outs = []

while True:
try:
# We've already waited for execute_reply, so all output
# should already be waiting. However, on slow networks, like
# in certain CI systems, waiting < 1 second might miss messages.
# So long as the kernel sends a status:idle message when it
# finishes, we won't actually have to wait this long, anyway.
msg = self.kc.iopub_channel.get_msg(timeout=4)
except Empty:
self.log.warn("Timeout waiting for IOPub output")
if self.raise_on_iopub_timeout:
raise RuntimeError("Timeout waiting for IOPub output")
else:
break
if msg['parent_header'].get('msg_id') != msg_id:
# not an output from our execution
continue

msg_type = msg['msg_type']
self.log.debug("output: %s", msg_type)
content = msg['content']

# set the prompt number for the input and the output
if 'execution_count' in content:
cell['execution_count'] = content['execution_count']

if msg_type == 'status':
if content['execution_state'] == 'idle':
break
else:
continue
elif msg_type == 'execute_input':
continue
elif msg_type == 'clear_output':
outs = []
continue
elif msg_type.startswith('comm'):
continue
def output_hook(msg):
msg_type = msg['header']['msg_type']
if 'execution_count' in msg['content']:
cell['execution_count'] = msg['content']['execution_count']
if msg_type in ('status', 'execute_input') or msg_type.startswith('comm'):
return
elif msg_type =='clear_output':
outs[:] = []
return

try:
out = output_from_msg(msg)
except ValueError:
self.log.error("unhandled iopub msg: " + msg_type)
else:
outs.append(out)

if self.timeout_func is not None:
timeout = self.timeout_func(cell)
else:
timeout = self.timeout
if timeout is not None and timeout < 0:
timeout = None
self.log.debug("Executing cell:\n%s", cell.source)
try:
reply = self.kc.execute_interactive(cell.source, timeout=timeout,
output_hook=output_hook
)
except TimeoutError:
if self.interrupt_on_timeout:
self.log.error("Interrupting kernel")
self.km.interrupt_kernel()
# TODO: flush output
# FIXME: should `flushing output` be a method on Client?
else:
raise

return outs
2 changes: 1 addition & 1 deletion nbconvert/preprocessors/tests/test_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def test_timeout_func(self):
exception = RuntimeError

def timeout_func(source):
return 10
return 1

assert_raises(exception, self.run_notebook, filename, dict(timeout_func=timeout_func), res)

Expand Down