Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
davidbrochart committed Mar 15, 2021
1 parent 2d06cc5 commit 2b2b69b
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 97 deletions.
24 changes: 4 additions & 20 deletions jupyter_client/asynchronous/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,6 @@ class AsyncKernelClient(KernelClient):
get_stdin_msg = KernelClient._async_get_stdin_msg
get_control_msg = KernelClient._async_get_control_msg

#@property
#def hb_channel(self):
# """Get the hb channel object for this kernel."""
# if self._hb_channel is None:
# url = self._make_url('hb')
# self.log.debug("connecting heartbeat channel to %s", url)
# loop = asyncio.new_event_loop()
# self._hb_channel = self.hb_channel_class(
# self.context, self.session, url, loop
# )
# return self._hb_channel

wait_for_ready = KernelClient._async_wait_for_ready

# The classes to use for the various channels
Expand All @@ -45,7 +33,7 @@ class AsyncKernelClient(KernelClient):
control_channel_class = Type(ZMQSocketChannel)


_recv_reply = KernelClient._async__recv_reply
_recv_reply = KernelClient._async_recv_reply


# replies come on the shell channel
Expand All @@ -55,14 +43,10 @@ class AsyncKernelClient(KernelClient):
inspect = reqrep(KernelClient._async_inspect)
kernel_info = reqrep(KernelClient._async_kernel_info)
comm_info = reqrep(KernelClient._async_comm_info)

# replies come on the control channel
shutdown = reqrep(KernelClient._async_shutdown, channel='control')

is_alive = KernelClient._async_is_alive

execute_interactive = KernelClient._async_execute_interactive

stop_channels = KernelClient._async_stop_channels

channels_running = property(KernelClient._async_channels_running)

# replies come on the control channel
shutdown = reqrep(KernelClient._async_shutdown, channel='control')
12 changes: 4 additions & 8 deletions jupyter_client/blocking/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class BlockingKernelClient(KernelClient):
control_channel_class = Type(ZMQSocketChannel)


_recv_reply = run_sync(KernelClient._async__recv_reply)
_recv_reply = run_sync(KernelClient._async_recv_reply)


# replies come on the shell channel
Expand All @@ -47,14 +47,10 @@ class BlockingKernelClient(KernelClient):
inspect = run_sync(reqrep(KernelClient._async_inspect))
kernel_info = run_sync(reqrep(KernelClient._async_kernel_info))
comm_info = run_sync(reqrep(KernelClient._async_comm_info))

# replies come on the control channel
shutdown = run_sync(reqrep(KernelClient._async_shutdown, channel='control'))

is_alive = run_sync(KernelClient._async_is_alive)

execute_interactive = run_sync(KernelClient._async_execute_interactive)

stop_channels = run_sync(KernelClient._async_stop_channels)

channels_running = property(run_sync(KernelClient._async_channels_running))

# replies come on the control channel
shutdown = run_sync(reqrep(KernelClient._async_shutdown, channel='control'))
12 changes: 7 additions & 5 deletions jupyter_client/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class HBChannel(Thread):
_pause = None
_beating = None

def __init__(self, context=None, session=None, address=None, loop=None):
def __init__(self, context=None, session=None, address=None):
"""Create the heartbeat monitor thread.
Parameters
Expand All @@ -62,8 +62,6 @@ def __init__(self, context=None, session=None, address=None, loop=None):
super().__init__()
self.daemon = True

self.loop = loop

self.context = context
self.session = session
if isinstance(address, tuple):
Expand Down Expand Up @@ -93,6 +91,12 @@ def _create_socket(self):
# close previous socket, before opening a new one
self.poller.unregister(self.socket)
self.socket.close()
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

self.socket = self.context.socket(zmq.REQ)
self.socket.linger = 1000
self.socket.connect(self.address)
Expand Down Expand Up @@ -134,8 +138,6 @@ def _poll(self, start_time):

def run(self):
"""The thread's main activity. Call start() instead."""
if self.loop is not None:
asyncio.set_event_loop(self.loop)
self._create_socket()
self._running = True
self._beating = True
Expand Down
20 changes: 4 additions & 16 deletions jupyter_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async def wrapped(self, *args, **kwargs):
if not reply:
return msg_id

return await self._async__recv_reply(msg_id, timeout=timeout, channel=channel)
return await self._async_recv_reply(msg_id, timeout=timeout, channel=channel)

if not meth.__doc__:
# python -OO removes docstrings,
Expand Down Expand Up @@ -199,7 +199,7 @@ async def _async_wait_for_ready(self, timeout=None):
except Empty:
break

async def _async__recv_reply(self, msg_id, timeout=None, channel='shell'):
async def _async_recv_reply(self, msg_id, timeout=None, channel='shell'):
"""Receive and return the reply for a given request"""
if timeout is not None:
deadline = time.monotonic() + timeout
Expand Down Expand Up @@ -359,23 +359,11 @@ def hb_channel(self):
if self._hb_channel is None:
url = self._make_url('hb')
self.log.debug("connecting heartbeat channel to %s", url)
loop = asyncio.new_event_loop()
self._hb_channel = self.hb_channel_class(
self.context, self.session, url, loop
self.context, self.session, url
)
return self._hb_channel

#@property
#def hb_channel(self):
# """Get the hb channel object for this kernel."""
# if self._hb_channel is None:
# url = self._make_url('hb')
# self.log.debug("connecting heartbeat channel to %s", url)
# self._hb_channel = self.hb_channel_class(
# self.context, self.session, url
# )
# return self._hb_channel

@property
def control_channel(self):
"""Get the control channel object for this kernel."""
Expand Down Expand Up @@ -540,7 +528,7 @@ async def _async_execute_interactive(self, code, silent=False, store_history=Tru
# output is done, get the reply
if timeout is not None:
timeout = max(0, deadline - time.monotonic())
return await self._async__recv_reply(msg_id, timeout=timeout)
return await self._async_recv_reply(msg_id, timeout=timeout)


# Methods to send specific messages on channels
Expand Down
38 changes: 19 additions & 19 deletions jupyter_client/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
Any, Float, Instance, Unicode, List, Bool, Type, DottedObjectName,
default, observe, observe_compat
)
from traitlets.utils.importstring import import_item
from traitlets.utils.importstring import import_item # type: ignore
from jupyter_client import (
launch_kernel,
kernelspec,
Expand All @@ -33,7 +33,7 @@
from .managerabc import (
KernelManagerABC
)
from .util import run_sync
from .util import run_sync, ensure_async

class _ShutdownStatus(Enum):
"""
Expand Down Expand Up @@ -272,7 +272,7 @@ def from_ns(match):

return [pat.sub(from_ns, arg) for arg in cmd]

async def _async__launch_kernel(
async def _async_launch_kernel(
self,
kernel_cmd: t.List[str],
**kw
Expand All @@ -283,7 +283,7 @@ async def _async__launch_kernel(
"""
return launch_kernel(kernel_cmd, **kw)

_launch_kernel = run_sync(_async__launch_kernel)
_launch_kernel = run_sync(_async_launch_kernel)

# Control socket used for polite kernel shutdown

Expand Down Expand Up @@ -380,7 +380,7 @@ async def _async_start_kernel(self, **kw):

# launch the kernel subprocess
self.log.debug("Starting kernel: %s", kernel_cmd)
self.kernel = await self._async__launch_kernel(kernel_cmd, **kw)
self.kernel = await ensure_async(self._launch_kernel(kernel_cmd, **kw))
self.post_start_kernel(**kw)

start_kernel = run_sync(_async_start_kernel)
Expand Down Expand Up @@ -417,7 +417,7 @@ async def _async_finish_shutdown(
except asyncio.TimeoutError:
self.log.debug("Kernel is taking too long to finish, terminating")
self._shutdown_status = _ShutdownStatus.SigtermRequest
await self._async__send_kernel_sigterm()
await self._async_send_kernel_sigterm()

try:
await asyncio.wait_for(
Expand All @@ -426,7 +426,7 @@ async def _async_finish_shutdown(
except asyncio.TimeoutError:
self.log.debug("Kernel is taking too long to finish, killing")
self._shutdown_status = _ShutdownStatus.SigkillRequest
await self._async__kill_kernel()
await ensure_async(self._kill_kernel())
else:
# Process is no longer alive, wait and clear
if self.kernel is not None:
Expand Down Expand Up @@ -485,16 +485,16 @@ async def _async_shutdown_kernel(
# Stop monitoring for restarting while we shutdown.
self.stop_restarter()

await self._async_interrupt_kernel()
await ensure_async(self.interrupt_kernel())

if now:
await self._async__kill_kernel()
await ensure_async(self._kill_kernel())
else:
self.request_shutdown(restart=restart)
# Don't send any additional kernel kill messages immediately, to give
# the kernel a chance to properly execute shutdown actions. Wait for at
# most 1s, checking every 0.1s.
await self._async_finish_shutdown()
await ensure_async(self.finish_shutdown())

# In 6.1.5, a new method, cleanup_resources(), was introduced to address
# a leak issue (https://github.com/jupyter/jupyter_client/pull/548) and
Expand Down Expand Up @@ -554,14 +554,14 @@ async def _async_restart_kernel(
"No previous call to 'start_kernel'.")
else:
# Stop currently running kernel.
await self._async_shutdown_kernel(now=now, restart=True)
await ensure_async(self.shutdown_kernel(now=now, restart=True))

if newports:
self.cleanup_random_ports()

# Start new kernel.
self._launch_args.update(kw)
await self._async_start_kernel(**self._launch_args)
await ensure_async(self.start_kernel(**self._launch_args))

restart_kernel = run_sync(_async_restart_kernel)

Expand All @@ -570,7 +570,7 @@ def has_kernel(self) -> bool:
"""Has a kernel been started that we are managing."""
return self.kernel is not None

async def _async__send_kernel_sigterm(self) -> None:
async def _async_send_kernel_sigterm(self) -> None:
"""similar to _kill_kernel, but with sigterm (not sigkill), but do not block"""
if self.has_kernel:
# Signal the kernel to terminate (sends SIGTERM on Unix and
Expand Down Expand Up @@ -600,9 +600,9 @@ async def _async__send_kernel_sigterm(self) -> None:
if e.errno != ESRCH:
raise

_send_kernel_sigterm = run_sync(_async__send_kernel_sigterm)
_send_kernel_sigterm = run_sync(_async_send_kernel_sigterm)

async def _async__kill_kernel(self) -> None:
async def _async_kill_kernel(self) -> None:
"""Kill the running kernel.
This is a private method, callers should use shutdown_kernel(now=True).
Expand Down Expand Up @@ -641,7 +641,7 @@ async def _async__kill_kernel(self) -> None:
self.kernel.wait()
self.kernel = None

_kill_kernel = run_sync(_async__kill_kernel)
_kill_kernel = run_sync(_async_kill_kernel)

async def _async_interrupt_kernel(self) -> None:
"""Interrupts the kernel by sending it a signal.
Expand Down Expand Up @@ -723,13 +723,13 @@ class AsyncKernelManager(KernelManager):
client_class: DottedObjectName = DottedObjectName('jupyter_client.asynchronous.AsyncKernelClient')
client_factory: Type = Type(klass='jupyter_client.asynchronous.AsyncKernelClient')

_launch_kernel = KernelManager._async__launch_kernel
_launch_kernel = KernelManager._async_launch_kernel
start_kernel = KernelManager._async_start_kernel
finish_shutdown = KernelManager._async_finish_shutdown
shutdown_kernel = KernelManager._async_shutdown_kernel
restart_kernel = KernelManager._async_restart_kernel
_send_kernel_sigterm = KernelManager._async__send_kernel_sigterm
_kill_kernel = KernelManager._async__kill_kernel
_send_kernel_sigterm = KernelManager._async_send_kernel_sigterm
_kill_kernel = KernelManager._async_kill_kernel
interrupt_kernel = KernelManager._async_interrupt_kernel
signal_kernel = KernelManager._async_signal_kernel
is_alive = KernelManager._async_is_alive
Expand Down
45 changes: 18 additions & 27 deletions jupyter_client/tests/test_kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
import signal
import sys
import time
import threading
import multiprocessing as mp
import concurrent.futures
import pytest

import nest_asyncio
from async_generator import async_generator, yield_
from traitlets.config.loader import Config
from jupyter_core import paths
Expand Down Expand Up @@ -349,41 +349,32 @@ def test_start_parallel_thread_kernels(self, config, install_kernel):
pytest.skip("IPC transport is currently not working for this test!")
self._run_signaltest_lifecycle(config)

thread = threading.Thread(target=self._run_signaltest_lifecycle, args=(config,))
thread2 = threading.Thread(target=self._run_signaltest_lifecycle, args=(config,))
try:
thread.start()
thread2.start()
finally:
thread.join()
thread2.join()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as thread_executor:
future1 = thread_executor.submit(self._run_signaltest_lifecycle, config)
future2 = thread_executor.submit(self._run_signaltest_lifecycle, config)
future1.result()
future2.result()

@pytest.mark.timeout(TIMEOUT)
def test_start_parallel_process_kernels(self, config, install_kernel):
if config.KernelManager.transport == 'ipc': # FIXME
pytest.skip("IPC transport is currently not working for this test!")
self._run_signaltest_lifecycle(config)
thread = threading.Thread(target=self._run_signaltest_lifecycle, args=(config,))
proc = mp.Process(target=self._run_signaltest_lifecycle, args=(config,))
try:
thread.start()
proc.start()
finally:
thread.join()
proc.join()

assert proc.exitcode == 0
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread_executor:
future1 = thread_executor.submit(self._run_signaltest_lifecycle, config)
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as process_executor:
future2 = process_executor.submit(self._run_signaltest_lifecycle, config)
future2.result()
future1.result()

@pytest.mark.timeout(TIMEOUT)
def test_start_sequence_process_kernels(self, config, install_kernel):
if config.KernelManager.transport == 'ipc': # FIXME
pytest.skip("IPC transport is currently not working for this test!")
self._run_signaltest_lifecycle(config)
proc = mp.Process(target=self._run_signaltest_lifecycle, args=(config,))
try:
proc.start()
finally:
proc.join()

assert proc.exitcode == 0
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as pool_executor:
future = pool_executor.submit(self._run_signaltest_lifecycle, config)
future.result()

def _prepare_kernel(self, km, startup_timeout=TIMEOUT, **kwargs):
km.start_kernel(**kwargs)
Expand Down
7 changes: 5 additions & 2 deletions jupyter_client/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
import nest_asyncio
nest_asyncio.apply()

loop = asyncio.get_event_loop()

def run_sync(coro):
def wrapped(*args, **kwargs):
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(coro(*args, **kwargs))
wrapped.__doc__ = coro.__doc__
return wrapped
Expand Down

0 comments on commit 2b2b69b

Please sign in to comment.