diff --git a/.travis.yml b/.travis.yml index a2a8ffe14..4afdf8b1c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,6 +13,6 @@ install: - pip install -f travis-wheels/wheelhouse -e .[test] codecov - python -c 'import ipykernel.kernelspec; ipykernel.kernelspec.install(user=True)' script: - - nosetests --with-coverage --cover-package jupyter_client jupyter_client + - nosetests -v --with-coverage --cover-package jupyter_client jupyter_client after_success: - codecov diff --git a/jupyter_client/blocking/client.py b/jupyter_client/blocking/client.py index 37e83dbf1..16c022689 100644 --- a/jupyter_client/blocking/client.py +++ b/jupyter_client/blocking/client.py @@ -5,17 +5,69 @@ # Copyright (c) Jupyter Development Team. # Distributed under the terms of the Modified BSD License. +from __future__ import print_function + +from functools import partial +from getpass import getpass try: from queue import Empty # Python 3 except ImportError: from Queue import Empty # Python 2 +import sys import time +import zmq + from traitlets import Type from jupyter_client.channels import HBChannel from jupyter_client.client import KernelClient from .channels import ZMQSocketChannel +try: + monotonic = time.monotonic +except AttributeError: + # py2 + monotonic = time.time # close enough + +try: + TimeoutError +except NameError: + # py2 + TimeoutError = RuntimeError + + +def reqrep(meth): + def wrapped(self, *args, **kwargs): + reply = kwargs.pop('reply', False) + timeout = kwargs.pop('timeout', None) + msg_id = meth(self, *args, **kwargs) + if not reply: + return msg_id + + return self._recv_reply(msg_id, timeout=timeout) + + basedoc, _ = meth.__doc__.split('Returns\n', 1) + parts = [basedoc.strip()] + if 'Parameters' not in basedoc: + parts.append(""" + Parameters + ---------- + """) + parts.append(""" + reply: bool (default: False) + Whether to wait for and return reply + timeout: float or None (default: None) + Timeout to use when waiting for a reply + + Returns + ------- + msg_id: str + The msg_id of the request sent, if reply=False (default) + reply: dict + The reply message for this request, if reply=True + """) + wrapped.__doc__ = '\n'.join(parts) + return wrapped class BlockingKernelClient(KernelClient): """A BlockingKernelClient """ @@ -74,3 +126,212 @@ def wait_for_ready(self, timeout=None): iopub_channel_class = Type(ZMQSocketChannel) stdin_channel_class = Type(ZMQSocketChannel) hb_channel_class = Type(HBChannel) + + + def _recv_reply(self, msg_id, timeout=None): + """Receive and return the reply for a given request""" + if timeout is not None: + deadline = monotonic() + timeout + while True: + if timeout is not None: + timeout = max(0, deadline - monotonic()) + try: + reply = self.get_shell_msg(timeout=timeout) + except Empty: + raise TimeoutError("Timeout waiting for reply") + if reply['parent_header'].get('msg_id') != msg_id: + # not my reply, someone may have forgotten to retrieve theirs + continue + return reply + + + execute = reqrep(KernelClient.execute) + history = reqrep(KernelClient.history) + complete = reqrep(KernelClient.complete) + inspect = reqrep(KernelClient.inspect) + kernel_info = reqrep(KernelClient.kernel_info) + comm_info = reqrep(KernelClient.comm_info) + shutdown = reqrep(KernelClient.shutdown) + + + def _stdin_hook_default(self, msg): + """Handle an input request""" + content = msg['content'] + if content.get('password', False): + prompt = getpass + elif sys.version_info < (3,): + prompt = raw_input + else: + prompt = input + + try: + raw_data = prompt(content["prompt"]) + except EOFError: + # turn EOFError into EOF character + raw_data = '\x04' + except KeyboardInterrupt: + sys.stdout.write('\n') + return + + # only send stdin reply if there *was not* another request + # or execution finished while we were reading. + if not (self.stdin_channel.msg_ready() or self.shell_channel.msg_ready()): + self.input(raw_data) + + def _output_hook_default(self, msg): + """Default hook for redisplaying plain-text output""" + msg_type = msg['header']['msg_type'] + content = msg['content'] + if msg_type == 'stream': + stream = getattr(sys, content['name']) + stream.write(content['text']) + elif msg_type in ('display_data', 'execute_result'): + sys.stdout.write(content['data'].get('text/plain', '')) + elif msg_type == 'error': + print('\n'.join(content['traceback']), file=sys.stderr) + + def _output_hook_kernel(self, session, socket, parent_header, msg): + """Output hook when running inside an IPython kernel + + adds rich output support. + """ + msg_type = msg['header']['msg_type'] + if msg_type in ('display_data', 'execute_result', 'error'): + session.send(socket, msg_type, msg['content'], parent=parent_header) + else: + self._output_hook_default(msg) + + def execute_interactive(self, code, silent=False, store_history=True, + user_expressions=None, allow_stdin=None, stop_on_error=True, + timeout=None, output_hook=None, stdin_hook=None, + ): + """Execute code in the kernel interactively + + Output will be redisplayed, and stdin prompts will be relayed as well. + If an IPython kernel is detected, rich output will be displayed. + + You can pass a custom output_hook callable that will be called + with every IOPub message that is produced instead of the default redisplay. + + Parameters + ---------- + code : str + A string of code in the kernel's language. + + silent : bool, optional (default False) + If set, the kernel will execute the code as quietly possible, and + will force store_history to be False. + + store_history : bool, optional (default True) + If set, the kernel will store command history. This is forced + to be False if silent is True. + + user_expressions : dict, optional + A dict mapping names to expressions to be evaluated in the user's + dict. The expression values are returned as strings formatted using + :func:`repr`. + + allow_stdin : bool, optional (default self.allow_stdin) + Flag for whether the kernel can send stdin requests to frontends. + + Some frontends (e.g. the Notebook) do not support stdin requests. + If raw_input is called from code executed from such a frontend, a + StdinNotImplementedError will be raised. + + stop_on_error: bool, optional (default True) + Flag whether to abort the execution queue, if an exception is encountered. + + timeout: float or None (default: None) + Timeout to use when waiting for a reply + + output_hook: callable(msg) + Function to be called with output messages. + If not specified, output will be redisplayed. + + stdin_hook: callable(msg) + Function to be called with stdin_request messages. + If not specified, input/getpass will be called. + + Returns + ------- + reply: dict + The reply message for this request + """ + if not self.iopub_channel.is_alive(): + raise RuntimeError("IOPub channel must be running to receive output") + if allow_stdin is None: + allow_stdin = self.allow_stdin + if allow_stdin and not self.stdin_channel.is_alive(): + raise RuntimeError("stdin channel must be running to allow input") + msg_id = self.execute(code, + silent=silent, + store_history=store_history, + user_expressions=user_expressions, + allow_stdin=allow_stdin, + stop_on_error=stop_on_error, + ) + if stdin_hook is None: + stdin_hook = self._stdin_hook_default + if output_hook is None: + # detect IPython kernel + if 'IPython' in sys.modules: + from IPython import get_ipython + ip = get_ipython() + in_kernel = getattr(ip, 'kernel', False) + if in_kernel: + output_hook = partial( + self._output_hook_kernel, + ip.display_pub.session, + ip.display_pub.pub_socket, + ip.display_pub.parent_header, + ) + if output_hook is None: + # default: redisplay plain-text outputs + output_hook = self._output_hook_default + + # set deadline based on timeout + if timeout is not None: + deadline = monotonic() + timeout + else: + timeout_ms = None + + poller = zmq.Poller() + iopub_socket = self.iopub_channel.socket + poller.register(iopub_socket, zmq.POLLIN) + if allow_stdin: + stdin_socket = self.stdin_channel.socket + poller.register(stdin_socket, zmq.POLLIN) + else: + stdin_socket = None + + # wait for output and redisplay it + while True: + if timeout is not None: + timeout = max(0, deadline - monotonic()) + timeout_ms = 1e3 * timeout + events = dict(poller.poll(timeout_ms)) + if not events: + raise TimeoutError("Timeout waiting for output") + if stdin_socket in events: + req = self.stdin_channel.get_msg(timeout=0) + stdin_hook(req) + continue + if iopub_socket not in events: + continue + + msg = self.iopub_channel.get_msg(timeout=0) + + if msg['parent_header'].get('msg_id') != msg_id: + # not from my request + continue + output_hook(msg) + + # stop on idle + if msg['header']['msg_type'] == 'status' and \ + msg['content']['execution_state'] == 'idle': + break + + # output is done, get the reply + if timeout is not None: + timeout = max(0, deadline - monotonic()) + return self._recv_reply(msg_id, timeout=timeout) diff --git a/jupyter_client/client.py b/jupyter_client/client.py index 6b47f7dae..763af85a7 100644 --- a/jupyter_client/client.py +++ b/jupyter_client/client.py @@ -338,6 +338,9 @@ def history(self, raw=True, output=False, hist_access_type='range', **kwargs): ------- The ID of the message sent. """ + if hist_access_type == 'range': + kwargs.setdefault('session', 0) + kwargs.setdefault('start', 0) content = dict(raw=raw, output=output, hist_access_type=hist_access_type, **kwargs) msg = self.session.msg('history_request', content) @@ -345,13 +348,23 @@ def history(self, raw=True, output=False, hist_access_type='range', **kwargs): return msg['header']['msg_id'] def kernel_info(self): - """Request kernel info.""" + """Request kernel info + + Returns + ------- + The msg_id of the message sent + """ msg = self.session.msg('kernel_info_request') self.shell_channel.send(msg) return msg['header']['msg_id'] def comm_info(self, target_name=None): - """Request comm info.""" + """Request comm info + + Returns + ------- + The msg_id of the message sent + """ if target_name is None: content = {} else: @@ -380,6 +393,10 @@ def shutdown(self, restart=False): The kernel will send the reply via a function registered with Python's atexit module, ensuring it's truly done as the kernel is done with all normal operation. + + Returns + ------- + The msg_id of the message sent """ # Send quit message to kernel. Once we implement kernel-side setattr, # this should probably be done that way, but for now this will do. diff --git a/jupyter_client/tests/test_client.py b/jupyter_client/tests/test_client.py new file mode 100644 index 000000000..8375e6ae0 --- /dev/null +++ b/jupyter_client/tests/test_client.py @@ -0,0 +1,88 @@ +"""Tests for the KernelClient""" + +# Copyright (c) Jupyter Development Team. +# Distributed under the terms of the Modified BSD License. + + +import os +pjoin = os.path.join +from unittest import TestCase + +from nose import SkipTest + +from jupyter_client.kernelspec import KernelSpecManager, NoSuchKernel, NATIVE_KERNEL_NAME +from ..manager import start_new_kernel +from .utils import test_env + +from ipython_genutils.py3compat import string_types +from IPython.utils.capture import capture_output + +TIMEOUT = 30 + +class TestKernelClient(TestCase): + def setUp(self): + self.env_patch = test_env() + self.env_patch.start() + self.addCleanup(self.env_patch.stop) + try: + KernelSpecManager().get_kernel_spec(NATIVE_KERNEL_NAME) + except NoSuchKernel: + raise SkipTest() + self.km, self.kc = start_new_kernel(kernel_name=NATIVE_KERNEL_NAME) + self.addCleanup(self.kc.stop_channels) + self.addCleanup(self.km.shutdown_kernel) + + def test_execute_interactive(self): + kc = self.kc + + with capture_output() as io: + reply = kc.execute_interactive("print('hello')", timeout=TIMEOUT) + assert 'hello' in io.stdout + assert reply['content']['status'] == 'ok' + + def _check_reply(self, reply_type, reply): + self.assertIsInstance(reply, dict) + self.assertEqual(reply['header']['msg_type'], reply_type + '_reply') + self.assertEqual(reply['parent_header']['msg_type'], reply_type + '_request') + + def test_history(self): + kc = self.kc + msg_id = kc.history(session=0) + self.assertIsInstance(msg_id, string_types) + reply = kc.history(session=0, reply=True, timeout=TIMEOUT) + self._check_reply('history', reply) + + def test_inspect(self): + kc = self.kc + msg_id = kc.inspect('who cares') + self.assertIsInstance(msg_id, string_types) + reply = kc.inspect('code', reply=True, timeout=TIMEOUT) + self._check_reply('inspect', reply) + + def test_complete(self): + kc = self.kc + msg_id = kc.complete('who cares') + self.assertIsInstance(msg_id, string_types) + reply = kc.complete('code', reply=True, timeout=TIMEOUT) + self._check_reply('complete', reply) + + def test_kernel_info(self): + kc = self.kc + msg_id = kc.kernel_info() + self.assertIsInstance(msg_id, string_types) + reply = kc.kernel_info(reply=True, timeout=TIMEOUT) + self._check_reply('kernel_info', reply) + + def test_comm_info(self): + kc = self.kc + msg_id = kc.comm_info() + self.assertIsInstance(msg_id, string_types) + reply = kc.comm_info(reply=True, timeout=TIMEOUT) + self._check_reply('comm_info', reply) + + def test_shutdown(self): + kc = self.kc + msg_id = kc.shutdown() + self.assertIsInstance(msg_id, string_types) + reply = kc.shutdown(reply=True, timeout=TIMEOUT) + self._check_reply('shutdown', reply) diff --git a/setup.py b/setup.py index 35324211e..5d2e860d9 100644 --- a/setup.py +++ b/setup.py @@ -81,7 +81,7 @@ ] extras_require = setuptools_args['extras_require'] = { - 'test': 'ipykernel', + 'test': ['ipykernel', 'ipython'], } if 'setuptools' in sys.modules: