Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ install:
- pip install -f travis-wheels/wheelhouse -e .[test] codecov
- python -c 'import ipykernel.kernelspec; ipykernel.kernelspec.install(user=True)'
script:
- nosetests --with-coverage --cover-package jupyter_client jupyter_client
- nosetests -v --with-coverage --cover-package jupyter_client jupyter_client
after_success:
- codecov
261 changes: 261 additions & 0 deletions jupyter_client/blocking/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,69 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.

from __future__ import print_function

from functools import partial
from getpass import getpass
try:
from queue import Empty # Python 3
except ImportError:
from Queue import Empty # Python 2
import sys
import time

import zmq

from traitlets import Type
from jupyter_client.channels import HBChannel
from jupyter_client.client import KernelClient
from .channels import ZMQSocketChannel

try:
monotonic = time.monotonic
except AttributeError:
# py2
monotonic = time.time # close enough

try:
TimeoutError
except NameError:
# py2
TimeoutError = RuntimeError


def reqrep(meth):
def wrapped(self, *args, **kwargs):
reply = kwargs.pop('reply', False)
timeout = kwargs.pop('timeout', None)
msg_id = meth(self, *args, **kwargs)
if not reply:
return msg_id

return self._recv_reply(msg_id, timeout=timeout)

basedoc, _ = meth.__doc__.split('Returns\n', 1)
parts = [basedoc.strip()]
if 'Parameters' not in basedoc:
parts.append("""
Parameters
----------
""")
parts.append("""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit leary of the docstring munging, because I don't think we'll remember it when editing docstrings - e.g. if we add a note below 'returns', it will be chopped off. Not a big deal, though.

Copy link
Member Author

@minrk minrk Aug 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, me, too. I couldn't think of a better way, though. If you have a less gross idea, I'm all for it.

The main reason I did this is that I can't see a good way to get a nice signature for the wrapped functions. If I had a good signature, I'd be happier with a simpler "See KernelClient.method for more details...". It's easy to inherit the wrapped method's signature exactly, but I didn't see a good way to do that and add the reply, timeout arguments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're willing to depend on (or bundle) a backported version of the Py3 inspect.signature machinery for Py2, it should be relatively easy to add a couple of parameters to a signature and set the f.__signature__ attribute. IIRC, IPython's inspection will use that information even on Python 2, which is good enough for me.

reply: bool (default: False)
Whether to wait for and return reply
timeout: float or None (default: None)
Timeout to use when waiting for a reply

Returns
-------
msg_id: str
The msg_id of the request sent, if reply=False (default)
reply: dict
The reply message for this request, if reply=True
""")
wrapped.__doc__ = '\n'.join(parts)
return wrapped

class BlockingKernelClient(KernelClient):
"""A BlockingKernelClient """
Expand Down Expand Up @@ -74,3 +126,212 @@ def wait_for_ready(self, timeout=None):
iopub_channel_class = Type(ZMQSocketChannel)
stdin_channel_class = Type(ZMQSocketChannel)
hb_channel_class = Type(HBChannel)


def _recv_reply(self, msg_id, timeout=None):
"""Receive and return the reply for a given request"""
if timeout is not None:
deadline = monotonic() + timeout
while True:
if timeout is not None:
timeout = max(0, deadline - monotonic())
try:
reply = self.get_shell_msg(timeout=timeout)
except Empty:
raise TimeoutError("Timeout waiting for reply")
if reply['parent_header'].get('msg_id') != msg_id:
# not my reply, someone may have forgotten to retrieve theirs
continue
return reply


execute = reqrep(KernelClient.execute)
history = reqrep(KernelClient.history)
complete = reqrep(KernelClient.complete)
inspect = reqrep(KernelClient.inspect)
kernel_info = reqrep(KernelClient.kernel_info)
comm_info = reqrep(KernelClient.comm_info)
shutdown = reqrep(KernelClient.shutdown)


def _stdin_hook_default(self, msg):
"""Handle an input request"""
content = msg['content']
if content.get('password', False):
prompt = getpass
elif sys.version_info < (3,):
prompt = raw_input
else:
prompt = input

try:
raw_data = prompt(content["prompt"])
except EOFError:
# turn EOFError into EOF character
raw_data = '\x04'
except KeyboardInterrupt:
sys.stdout.write('\n')
return

# only send stdin reply if there *was not* another request
# or execution finished while we were reading.
if not (self.stdin_channel.msg_ready() or self.shell_channel.msg_ready()):
self.input(raw_data)

def _output_hook_default(self, msg):
"""Default hook for redisplaying plain-text output"""
msg_type = msg['header']['msg_type']
content = msg['content']
if msg_type == 'stream':
stream = getattr(sys, content['name'])
stream.write(content['text'])
elif msg_type in ('display_data', 'execute_result'):
sys.stdout.write(content['data'].get('text/plain', ''))
elif msg_type == 'error':
print('\n'.join(content['traceback']), file=sys.stderr)

def _output_hook_kernel(self, session, socket, parent_header, msg):
"""Output hook when running inside an IPython kernel

adds rich output support.
"""
msg_type = msg['header']['msg_type']
if msg_type in ('display_data', 'execute_result', 'error'):
session.send(socket, msg_type, msg['content'], parent=parent_header)
else:
self._output_hook_default(msg)

def execute_interactive(self, code, silent=False, store_history=True,
user_expressions=None, allow_stdin=None, stop_on_error=True,
timeout=None, output_hook=None, stdin_hook=None,
):
"""Execute code in the kernel interactively

Output will be redisplayed, and stdin prompts will be relayed as well.
If an IPython kernel is detected, rich output will be displayed.

You can pass a custom output_hook callable that will be called
with every IOPub message that is produced instead of the default redisplay.

Parameters
----------
code : str
A string of code in the kernel's language.

silent : bool, optional (default False)
If set, the kernel will execute the code as quietly possible, and
will force store_history to be False.

store_history : bool, optional (default True)
If set, the kernel will store command history. This is forced
to be False if silent is True.

user_expressions : dict, optional
A dict mapping names to expressions to be evaluated in the user's
dict. The expression values are returned as strings formatted using
:func:`repr`.

allow_stdin : bool, optional (default self.allow_stdin)
Flag for whether the kernel can send stdin requests to frontends.

Some frontends (e.g. the Notebook) do not support stdin requests.
If raw_input is called from code executed from such a frontend, a
StdinNotImplementedError will be raised.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also mention that with reply=True, input requests will trigger terminal interaction using input/getpass.


stop_on_error: bool, optional (default True)
Flag whether to abort the execution queue, if an exception is encountered.

timeout: float or None (default: None)
Timeout to use when waiting for a reply

output_hook: callable(msg)
Function to be called with output messages.
If not specified, output will be redisplayed.

stdin_hook: callable(msg)
Function to be called with stdin_request messages.
If not specified, input/getpass will be called.

Returns
-------
reply: dict
The reply message for this request
"""
if not self.iopub_channel.is_alive():
raise RuntimeError("IOPub channel must be running to receive output")
if allow_stdin is None:
allow_stdin = self.allow_stdin
if allow_stdin and not self.stdin_channel.is_alive():
raise RuntimeError("stdin channel must be running to allow input")
msg_id = self.execute(code,
silent=silent,
store_history=store_history,
user_expressions=user_expressions,
allow_stdin=allow_stdin,
stop_on_error=stop_on_error,
)
if stdin_hook is None:
stdin_hook = self._stdin_hook_default
if output_hook is None:
# detect IPython kernel
if 'IPython' in sys.modules:
from IPython import get_ipython
ip = get_ipython()
in_kernel = getattr(ip, 'kernel', False)
if in_kernel:
output_hook = partial(
self._output_hook_kernel,
ip.display_pub.session,
ip.display_pub.pub_socket,
ip.display_pub.parent_header,
)
if output_hook is None:
# default: redisplay plain-text outputs
output_hook = self._output_hook_default

# set deadline based on timeout
if timeout is not None:
deadline = monotonic() + timeout
else:
timeout_ms = None

poller = zmq.Poller()
iopub_socket = self.iopub_channel.socket
poller.register(iopub_socket, zmq.POLLIN)
if allow_stdin:
stdin_socket = self.stdin_channel.socket
poller.register(stdin_socket, zmq.POLLIN)
else:
stdin_socket = None

# wait for output and redisplay it
while True:
if timeout is not None:
timeout = max(0, deadline - monotonic())
timeout_ms = 1e3 * timeout
events = dict(poller.poll(timeout_ms))
if not events:
raise TimeoutError("Timeout waiting for output")
if stdin_socket in events:
req = self.stdin_channel.get_msg(timeout=0)
stdin_hook(req)
continue
if iopub_socket not in events:
continue

msg = self.iopub_channel.get_msg(timeout=0)

if msg['parent_header'].get('msg_id') != msg_id:
# not from my request
continue
output_hook(msg)

# stop on idle
if msg['header']['msg_type'] == 'status' and \
msg['content']['execution_state'] == 'idle':
break

# output is done, get the reply
if timeout is not None:
timeout = max(0, deadline - monotonic())
return self._recv_reply(msg_id, timeout=timeout)
21 changes: 19 additions & 2 deletions jupyter_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,20 +338,33 @@ def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
-------
The ID of the message sent.
"""
if hist_access_type == 'range':
kwargs.setdefault('session', 0)
kwargs.setdefault('start', 0)
content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
**kwargs)
msg = self.session.msg('history_request', content)
self.shell_channel.send(msg)
return msg['header']['msg_id']

def kernel_info(self):
"""Request kernel info."""
"""Request kernel info

Returns
-------
The msg_id of the message sent
"""
msg = self.session.msg('kernel_info_request')
self.shell_channel.send(msg)
return msg['header']['msg_id']

def comm_info(self, target_name=None):
"""Request comm info."""
"""Request comm info

Returns
-------
The msg_id of the message sent
"""
if target_name is None:
content = {}
else:
Expand Down Expand Up @@ -380,6 +393,10 @@ def shutdown(self, restart=False):
The kernel will send the reply via a function registered with Python's
atexit module, ensuring it's truly done as the kernel is done with all
normal operation.

Returns
-------
The msg_id of the message sent
"""
# Send quit message to kernel. Once we implement kernel-side setattr,
# this should probably be done that way, but for now this will do.
Expand Down
Loading