diff --git a/jupyter_client/blocking/client.py b/jupyter_client/blocking/client.py index 37e83dbf1..fb0a58481 100644 --- a/jupyter_client/blocking/client.py +++ b/jupyter_client/blocking/client.py @@ -5,17 +5,31 @@ # Copyright (c) Jupyter Development Team. # Distributed under the terms of the Modified BSD License. +from __future__ import print_function + try: from queue import Empty # Python 3 except ImportError: from Queue import Empty # Python 2 +import sys import time +try: + monotonic = time.monotonic +except NameError: # py2 + monotonic = time.clock # close enough + from traitlets import Type from jupyter_client.channels import HBChannel from jupyter_client.client import KernelClient from .channels import ZMQSocketChannel +try: + TimeoutError +except NameError: + # py2 + TimeoutError = RuntimeError + class BlockingKernelClient(KernelClient): """A BlockingKernelClient """ @@ -74,3 +88,110 @@ def wait_for_ready(self, timeout=None): iopub_channel_class = Type(ZMQSocketChannel) stdin_channel_class = Type(ZMQSocketChannel) hb_channel_class = Type(HBChannel) + + def run(self, code, silent=False, store_history=True, + user_expressions=None, stop_on_error=True, + timeout=None, + ): + """Execute code in the kernel, redisplaying output. + + Wraps a call to `.execute`, capturing and redisplaying any output. + + Parameters + ---------- + code : str + A string of code in the kernel's language. + + silent : bool, optional (default False) + If set, the kernel will execute the code as quietly possible, and + will force store_history to be False. + + store_history : bool, optional (default True) + If set, the kernel will store command history. This is forced + to be False if silent is True. + + user_expressions : dict, optional + A dict mapping names to expressions to be evaluated in the user's + dict. The expression values are returned as strings formatted using + :func:`repr`. + + stop_on_error: bool, optional (default True) + Flag whether to abort the execution queue, if an exception is encountered. + timeout: int or None (default None) + Timeout (in seconds) to wait for output. If None, wait forever. + + Returns + ------- + reply: dict + The execute_reply message. + """ + if not self.iopub_channel.is_alive(): + raise RuntimeError("IOPub channel must be running to receive output") + msg_id = self.execute(code, + silent=silent, + store_history=store_history, + user_expressions=user_expressions, + stop_on_error=stop_on_error, + ) + + if 'IPython' in sys.modules: + from IPython import get_ipython + ip = get_ipython() + in_kernel = getattr(ip, 'kernel', False) + if in_kernel: + socket = ip.display_pub.pub_socket + session = ip.display_pub.session + parent_header = ip.display_pub.parent_header + else: + in_kernel = False + + # set deadline based on timeout + start = monotonic() + if timeout is not None: + deadline = monotonic() + timeout + + # wait for output and redisplay it + while True: + if timeout is not None: + timeout = max(0, deadline - monotonic()) + try: + msg = self.get_iopub_msg(timeout=timeout) + except Empty: + raise TimeoutError("Timeout waiting for IPython output") + + if msg['parent_header'].get('msg_id') != msg_id: + # not from my request + continue + msg_type = msg['header']['msg_type'] + content = msg['content'] + if msg_type == 'status': + if content['execution_state'] == 'idle': + # idle means output is done + break + elif msg_type == 'stream': + stream = getattr(sys, content['name']) + stream.write(content['text']) + elif msg_type in ('display_data', 'execute_result', 'error'): + if in_kernel: + session.send(socket, msg_type, content, parent=parent_header) + else: + if msg_type == 'error': + print('\n'.join(content['traceback']), file=sys.stderr) + else: + sys.stdout.write(content['data'].get('text/plain', '')) + else: + pass + + # output is done, get the reply + while True: + if timeout is not None: + timeout = max(0, deadline - monotonic()) + try: + reply = self.get_shell_msg(timeout=timeout) + except Empty: + raise TimeoutError("Timeout waiting for reply") + if reply['parent_header'].get('msg_id') != msg_id: + # not my reply, someone may have forgotten to retrieve theirs + continue + return reply + diff --git a/jupyter_client/tests/test_client.py b/jupyter_client/tests/test_client.py new file mode 100644 index 000000000..87e3e6eb8 --- /dev/null +++ b/jupyter_client/tests/test_client.py @@ -0,0 +1,39 @@ +"""Tests for the KernelClient""" + +# Copyright (c) Jupyter Development Team. +# Distributed under the terms of the Modified BSD License. + + +import os +pjoin = os.path.join +from unittest import TestCase + +from nose import SkipTest + +from jupyter_client.kernelspec import KernelSpecManager, NoSuchKernel, NATIVE_KERNEL_NAME +from ..manager import start_new_kernel +from .utils import test_env + +from IPython.utils.capture import capture_output + +TIMEOUT = 30 + +class TestKernelClient(TestCase): + def setUp(self): + self.env_patch = test_env() + self.env_patch.start() + + def tearDown(self): + self.env_patch.stop() + + def test_run(self): + try: + KernelSpecManager().get_kernel_spec(NATIVE_KERNEL_NAME) + except NoSuchKernel: + raise SkipTest() + km, kc = start_new_kernel(kernel_name=NATIVE_KERNEL_NAME) + + with capture_output() as io: + reply = kc.run("print('hello')", timeout=TIMEOUT) + assert 'hello' in io.stdout + assert reply['content']['status'] == 'ok'