Skip to content

Commit

Permalink
Merge pull request #185 from minrk/KernelClient.run
Browse files Browse the repository at this point in the history
add BlockingKernelClient.execute_interactive
  • Loading branch information
minrk authored Sep 26, 2016
2 parents 4e9a953 + 01133c8 commit 863ead4
Show file tree
Hide file tree
Showing 5 changed files with 370 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ install:
- pip install -f travis-wheels/wheelhouse -e .[test] codecov
- python -c 'import ipykernel.kernelspec; ipykernel.kernelspec.install(user=True)'
script:
- nosetests --with-coverage --cover-package jupyter_client jupyter_client
- nosetests -v --with-coverage --cover-package jupyter_client jupyter_client
after_success:
- codecov
261 changes: 261 additions & 0 deletions jupyter_client/blocking/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,69 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.

from __future__ import print_function

from functools import partial
from getpass import getpass
try:
from queue import Empty # Python 3
except ImportError:
from Queue import Empty # Python 2
import sys
import time

import zmq

from traitlets import Type
from jupyter_client.channels import HBChannel
from jupyter_client.client import KernelClient
from .channels import ZMQSocketChannel

try:
monotonic = time.monotonic
except AttributeError:
# py2
monotonic = time.time # close enough

try:
TimeoutError
except NameError:
# py2
TimeoutError = RuntimeError


def reqrep(meth):
def wrapped(self, *args, **kwargs):
reply = kwargs.pop('reply', False)
timeout = kwargs.pop('timeout', None)
msg_id = meth(self, *args, **kwargs)
if not reply:
return msg_id

return self._recv_reply(msg_id, timeout=timeout)

basedoc, _ = meth.__doc__.split('Returns\n', 1)
parts = [basedoc.strip()]
if 'Parameters' not in basedoc:
parts.append("""
Parameters
----------
""")
parts.append("""
reply: bool (default: False)
Whether to wait for and return reply
timeout: float or None (default: None)
Timeout to use when waiting for a reply
Returns
-------
msg_id: str
The msg_id of the request sent, if reply=False (default)
reply: dict
The reply message for this request, if reply=True
""")
wrapped.__doc__ = '\n'.join(parts)
return wrapped

class BlockingKernelClient(KernelClient):
"""A BlockingKernelClient """
Expand Down Expand Up @@ -74,3 +126,212 @@ def wait_for_ready(self, timeout=None):
iopub_channel_class = Type(ZMQSocketChannel)
stdin_channel_class = Type(ZMQSocketChannel)
hb_channel_class = Type(HBChannel)


def _recv_reply(self, msg_id, timeout=None):
"""Receive and return the reply for a given request"""
if timeout is not None:
deadline = monotonic() + timeout
while True:
if timeout is not None:
timeout = max(0, deadline - monotonic())
try:
reply = self.get_shell_msg(timeout=timeout)
except Empty:
raise TimeoutError("Timeout waiting for reply")
if reply['parent_header'].get('msg_id') != msg_id:
# not my reply, someone may have forgotten to retrieve theirs
continue
return reply


execute = reqrep(KernelClient.execute)
history = reqrep(KernelClient.history)
complete = reqrep(KernelClient.complete)
inspect = reqrep(KernelClient.inspect)
kernel_info = reqrep(KernelClient.kernel_info)
comm_info = reqrep(KernelClient.comm_info)
shutdown = reqrep(KernelClient.shutdown)


def _stdin_hook_default(self, msg):
"""Handle an input request"""
content = msg['content']
if content.get('password', False):
prompt = getpass
elif sys.version_info < (3,):
prompt = raw_input
else:
prompt = input

try:
raw_data = prompt(content["prompt"])
except EOFError:
# turn EOFError into EOF character
raw_data = '\x04'
except KeyboardInterrupt:
sys.stdout.write('\n')
return

# only send stdin reply if there *was not* another request
# or execution finished while we were reading.
if not (self.stdin_channel.msg_ready() or self.shell_channel.msg_ready()):
self.input(raw_data)

def _output_hook_default(self, msg):
"""Default hook for redisplaying plain-text output"""
msg_type = msg['header']['msg_type']
content = msg['content']
if msg_type == 'stream':
stream = getattr(sys, content['name'])
stream.write(content['text'])
elif msg_type in ('display_data', 'execute_result'):
sys.stdout.write(content['data'].get('text/plain', ''))
elif msg_type == 'error':
print('\n'.join(content['traceback']), file=sys.stderr)

def _output_hook_kernel(self, session, socket, parent_header, msg):
"""Output hook when running inside an IPython kernel
adds rich output support.
"""
msg_type = msg['header']['msg_type']
if msg_type in ('display_data', 'execute_result', 'error'):
session.send(socket, msg_type, msg['content'], parent=parent_header)
else:
self._output_hook_default(msg)

def execute_interactive(self, code, silent=False, store_history=True,
user_expressions=None, allow_stdin=None, stop_on_error=True,
timeout=None, output_hook=None, stdin_hook=None,
):
"""Execute code in the kernel interactively
Output will be redisplayed, and stdin prompts will be relayed as well.
If an IPython kernel is detected, rich output will be displayed.
You can pass a custom output_hook callable that will be called
with every IOPub message that is produced instead of the default redisplay.
Parameters
----------
code : str
A string of code in the kernel's language.
silent : bool, optional (default False)
If set, the kernel will execute the code as quietly possible, and
will force store_history to be False.
store_history : bool, optional (default True)
If set, the kernel will store command history. This is forced
to be False if silent is True.
user_expressions : dict, optional
A dict mapping names to expressions to be evaluated in the user's
dict. The expression values are returned as strings formatted using
:func:`repr`.
allow_stdin : bool, optional (default self.allow_stdin)
Flag for whether the kernel can send stdin requests to frontends.
Some frontends (e.g. the Notebook) do not support stdin requests.
If raw_input is called from code executed from such a frontend, a
StdinNotImplementedError will be raised.
stop_on_error: bool, optional (default True)
Flag whether to abort the execution queue, if an exception is encountered.
timeout: float or None (default: None)
Timeout to use when waiting for a reply
output_hook: callable(msg)
Function to be called with output messages.
If not specified, output will be redisplayed.
stdin_hook: callable(msg)
Function to be called with stdin_request messages.
If not specified, input/getpass will be called.
Returns
-------
reply: dict
The reply message for this request
"""
if not self.iopub_channel.is_alive():
raise RuntimeError("IOPub channel must be running to receive output")
if allow_stdin is None:
allow_stdin = self.allow_stdin
if allow_stdin and not self.stdin_channel.is_alive():
raise RuntimeError("stdin channel must be running to allow input")
msg_id = self.execute(code,
silent=silent,
store_history=store_history,
user_expressions=user_expressions,
allow_stdin=allow_stdin,
stop_on_error=stop_on_error,
)
if stdin_hook is None:
stdin_hook = self._stdin_hook_default
if output_hook is None:
# detect IPython kernel
if 'IPython' in sys.modules:
from IPython import get_ipython
ip = get_ipython()
in_kernel = getattr(ip, 'kernel', False)
if in_kernel:
output_hook = partial(
self._output_hook_kernel,
ip.display_pub.session,
ip.display_pub.pub_socket,
ip.display_pub.parent_header,
)
if output_hook is None:
# default: redisplay plain-text outputs
output_hook = self._output_hook_default

# set deadline based on timeout
if timeout is not None:
deadline = monotonic() + timeout
else:
timeout_ms = None

poller = zmq.Poller()
iopub_socket = self.iopub_channel.socket
poller.register(iopub_socket, zmq.POLLIN)
if allow_stdin:
stdin_socket = self.stdin_channel.socket
poller.register(stdin_socket, zmq.POLLIN)
else:
stdin_socket = None

# wait for output and redisplay it
while True:
if timeout is not None:
timeout = max(0, deadline - monotonic())
timeout_ms = 1e3 * timeout
events = dict(poller.poll(timeout_ms))
if not events:
raise TimeoutError("Timeout waiting for output")
if stdin_socket in events:
req = self.stdin_channel.get_msg(timeout=0)
stdin_hook(req)
continue
if iopub_socket not in events:
continue

msg = self.iopub_channel.get_msg(timeout=0)

if msg['parent_header'].get('msg_id') != msg_id:
# not from my request
continue
output_hook(msg)

# stop on idle
if msg['header']['msg_type'] == 'status' and \
msg['content']['execution_state'] == 'idle':
break

# output is done, get the reply
if timeout is not None:
timeout = max(0, deadline - monotonic())
return self._recv_reply(msg_id, timeout=timeout)
21 changes: 19 additions & 2 deletions jupyter_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,20 +338,33 @@ def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
-------
The ID of the message sent.
"""
if hist_access_type == 'range':
kwargs.setdefault('session', 0)
kwargs.setdefault('start', 0)
content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
**kwargs)
msg = self.session.msg('history_request', content)
self.shell_channel.send(msg)
return msg['header']['msg_id']

def kernel_info(self):
"""Request kernel info."""
"""Request kernel info
Returns
-------
The msg_id of the message sent
"""
msg = self.session.msg('kernel_info_request')
self.shell_channel.send(msg)
return msg['header']['msg_id']

def comm_info(self, target_name=None):
"""Request comm info."""
"""Request comm info
Returns
-------
The msg_id of the message sent
"""
if target_name is None:
content = {}
else:
Expand Down Expand Up @@ -380,6 +393,10 @@ def shutdown(self, restart=False):
The kernel will send the reply via a function registered with Python's
atexit module, ensuring it's truly done as the kernel is done with all
normal operation.
Returns
-------
The msg_id of the message sent
"""
# Send quit message to kernel. Once we implement kernel-side setattr,
# this should probably be done that way, but for now this will do.
Expand Down
Loading

0 comments on commit 863ead4

Please sign in to comment.