Skip to content

Commit

Permalink
add reply=True arg for all of Client req/rep methods
Browse files Browse the repository at this point in the history
waits for and returns reply instead of msg_id
  • Loading branch information
minrk committed Aug 11, 2016
1 parent fdb9356 commit 9967225
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 35 deletions.
105 changes: 82 additions & 23 deletions jupyter_client/blocking/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,38 @@
# py2
TimeoutError = RuntimeError

def reqrep(meth):
def wrapped(self, *args, **kwargs):
reply = kwargs.pop('reply', False)
timeout = kwargs.pop('timeout', None)
msg_id = meth(self, *args, **kwargs)
if not reply:
return msg_id

return self._recv_reply(msg_id, timeout=timeout)

basedoc, _ = meth.__doc__.split('Returns\n', 1)
parts = [basedoc.strip()]
if 'Parameters' not in basedoc:
parts.append("""
Parameters
----------
""")
parts.append("""
reply: bool (default: False)
Whether to wait for and return reply
timeout: float or None (default: None)
Timeout to use when waiting for a reply
Returns
-------
msg_id: str
The msg_id of the request sent, if reply=False (default)
reply: dict
The reply message for this request, if reply=True
""")
wrapped.__doc__ = '\n'.join(parts)
return wrapped

class BlockingKernelClient(KernelClient):
"""A BlockingKernelClient """
Expand Down Expand Up @@ -89,14 +121,38 @@ def wait_for_ready(self, timeout=None):
stdin_channel_class = Type(ZMQSocketChannel)
hb_channel_class = Type(HBChannel)

def run(self, code, silent=False, store_history=True,

def _recv_reply(self, msg_id, timeout=None):
"""Receive and return the reply for a given request"""
if timeout is not None:
deadline = monotonic() + timeout
while True:
if timeout is not None:
timeout = max(0, deadline - monotonic())
try:
reply = self.get_shell_msg(timeout=timeout)
except Empty:
raise TimeoutError("Timeout waiting for reply")
if reply['parent_header'].get('msg_id') != msg_id:
# not my reply, someone may have forgotten to retrieve theirs
continue
return reply

history = reqrep(KernelClient.history)
complete = reqrep(KernelClient.complete)
inspect = reqrep(KernelClient.inspect)
kernel_info = reqrep(KernelClient.kernel_info)
comm_info = reqrep(KernelClient.comm_info)
shutdown = reqrep(KernelClient.shutdown)


def execute(self, code, silent=False, store_history=True,
user_expressions=None, stop_on_error=True,
timeout=None,
reply=False, timeout=None,
):
"""Run code in the kernel, redisplaying output.
"""Execute code in the kernel.
Wraps a call to `.execute`, capturing and redisplaying any output produced.
The execute_reply is returned.
If reply=True, wait for reply and redisplay output produced by the execution.
Parameters
----------
Expand All @@ -116,19 +172,32 @@ def run(self, code, silent=False, store_history=True,
dict. The expression values are returned as strings formatted using
:func:`repr`.
allow_stdin : bool, optional (default self.allow_stdin)
Flag for whether the kernel can send stdin requests to frontends.
Some frontends (e.g. the Notebook) do not support stdin requests.
If raw_input is called from code executed from such a frontend, a
StdinNotImplementedError will be raised.
stop_on_error: bool, optional (default True)
Flag whether to abort the execution queue, if an exception is encountered.
timeout: int or None (default None)
Timeout (in seconds) to wait for output. If None, wait forever.
reply: bool (default: False)
Whether to wait for and return reply
timeout: float or None (default: None)
Timeout to use when waiting for a reply
Returns
-------
msg_id: str
The msg_id of the request sent, if reply=False (default)
reply: dict
The execute_reply message.
The reply message for this request, if reply=True
"""
if not self.iopub_channel.is_alive():
if reply and not self.iopub_channel.is_alive():
raise RuntimeError("IOPub channel must be running to receive output")
msg_id = self.execute(code,
msg_id = super(BlockingKernelClient, self).execute(code,
silent=silent,
store_history=store_history,
user_expressions=user_expressions,
Expand All @@ -147,7 +216,6 @@ def run(self, code, silent=False, store_history=True,
in_kernel = False

# set deadline based on timeout
start = monotonic()
if timeout is not None:
deadline = monotonic() + timeout

Expand Down Expand Up @@ -184,15 +252,6 @@ def run(self, code, silent=False, store_history=True,
pass

# output is done, get the reply
while True:
if timeout is not None:
timeout = max(0, deadline - monotonic())
try:
reply = self.get_shell_msg(timeout=timeout)
except Empty:
raise TimeoutError("Timeout waiting for reply")
if reply['parent_header'].get('msg_id') != msg_id:
# not my reply, someone may have forgotten to retrieve theirs
continue
return reply

if timeout is not None:
timeout = max(0, deadline - monotonic())
return self._recv_reply(msg_id, timeout=timeout)
18 changes: 16 additions & 2 deletions jupyter_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,13 +345,23 @@ def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
return msg['header']['msg_id']

def kernel_info(self):
"""Request kernel info."""
"""Request kernel info
Returns
-------
The msg_id of the message sent
"""
msg = self.session.msg('kernel_info_request')
self.shell_channel.send(msg)
return msg['header']['msg_id']

def comm_info(self, target_name=None):
"""Request comm info."""
"""Request comm info
Returns
-------
The msg_id of the message sent
"""
if target_name is None:
content = {}
else:
Expand Down Expand Up @@ -380,6 +390,10 @@ def shutdown(self, restart=False):
The kernel will send the reply via a function registered with Python's
atexit module, ensuring it's truly done as the kernel is done with all
normal operation.
Returns
-------
The msg_id of the message sent
"""
# Send quit message to kernel. Once we implement kernel-side setattr,
# this should probably be done that way, but for now this will do.
Expand Down
65 changes: 56 additions & 9 deletions jupyter_client/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from ..manager import start_new_kernel
from .utils import test_env

from ipython_genutils.py3compat import string_types
from IPython.utils.capture import capture_output

TIMEOUT = 30
Expand All @@ -22,20 +23,66 @@ class TestKernelClient(TestCase):
def setUp(self):
self.env_patch = test_env()
self.env_patch.start()

def tearDown(self):
self.env_patch.stop()

def test_run(self):
self.addCleanup(self.env_patch.stop)
try:
KernelSpecManager().get_kernel_spec(NATIVE_KERNEL_NAME)
except NoSuchKernel:
raise SkipTest()
km, kc = start_new_kernel(kernel_name=NATIVE_KERNEL_NAME)
self.addCleanup(kc.stop_channels)
self.addCleanup(km.shutdown_kernel)
self.km, self.kc = start_new_kernel(kernel_name=NATIVE_KERNEL_NAME)
self.addCleanup(self.kc.stop_channels)
self.addCleanup(self.km.shutdown_kernel)

def test_execute(self):
kc = self.kc

with capture_output() as io:
reply = kc.run("print('hello')", timeout=TIMEOUT)
reply = kc.execute("print('hello')", reply=True, timeout=TIMEOUT)
assert 'hello' in io.stdout
assert reply['content']['status'] == 'ok'

def _check_reply(self, reply_type, reply):
self.assertIsInstance(reply, dict)
self.assertEqual(reply['header']['msg_type'], reply_type + '_reply')
self.assertEqual(reply['parent_header']['msg_type'], reply_type + '_request')

def test_history(self):
kc = self.kc
msg_id = kc.history(session=0)
self.assertIsInstance(msg_id, string_types)
reply = kc.history(session=0, reply=True, timeout=TIMEOUT)
self._check_reply('history', reply)

def test_inspect(self):
kc = self.kc
msg_id = kc.inspect('who cares')
self.assertIsInstance(msg_id, string_types)
reply = kc.inspect('code', reply=True, timeout=TIMEOUT)
self._check_reply('inspect', reply)

def test_complete(self):
kc = self.kc
msg_id = kc.complete('who cares')
self.assertIsInstance(msg_id, string_types)
reply = kc.complete('code', reply=True, timeout=TIMEOUT)
self._check_reply('complete', reply)

def test_kernel_info(self):
kc = self.kc
msg_id = kc.kernel_info()
self.assertIsInstance(msg_id, string_types)
reply = kc.kernel_info(reply=True, timeout=TIMEOUT)
self._check_reply('kernel_info', reply)

def test_comm_info(self):
kc = self.kc
msg_id = kc.comm_info()
self.assertIsInstance(msg_id, string_types)
reply = kc.comm_info(reply=True, timeout=TIMEOUT)
self._check_reply('comm_info', reply)

def test_shutdown(self):
kc = self.kc
msg_id = kc.shutdown()
self.assertIsInstance(msg_id, string_types)
reply = kc.shutdown(reply=True, timeout=TIMEOUT)
self._check_reply('shutdown', reply)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
]

extras_require = setuptools_args['extras_require'] = {
'test': 'ipykernel',
'test': ['ipykernel', 'ipython'],
}

if 'setuptools' in sys.modules:
Expand Down

0 comments on commit 9967225

Please sign in to comment.