From 2b2b69ba46131c4948dd352d023cad9efcb56cfc Mon Sep 17 00:00:00 2001 From: David Brochart Date: Mon, 15 Mar 2021 09:08:48 +0100 Subject: [PATCH] Fix tests --- jupyter_client/asynchronous/client.py | 24 ++---------- jupyter_client/blocking/client.py | 12 ++---- jupyter_client/channels.py | 12 +++--- jupyter_client/client.py | 20 ++-------- jupyter_client/manager.py | 38 +++++++++--------- jupyter_client/tests/test_kernelmanager.py | 45 +++++++++------------- jupyter_client/util.py | 7 +++- 7 files changed, 61 insertions(+), 97 deletions(-) diff --git a/jupyter_client/asynchronous/client.py b/jupyter_client/asynchronous/client.py index 7a2632550..265cb0dde 100644 --- a/jupyter_client/asynchronous/client.py +++ b/jupyter_client/asynchronous/client.py @@ -23,18 +23,6 @@ class AsyncKernelClient(KernelClient): get_stdin_msg = KernelClient._async_get_stdin_msg get_control_msg = KernelClient._async_get_control_msg - #@property - #def hb_channel(self): - # """Get the hb channel object for this kernel.""" - # if self._hb_channel is None: - # url = self._make_url('hb') - # self.log.debug("connecting heartbeat channel to %s", url) - # loop = asyncio.new_event_loop() - # self._hb_channel = self.hb_channel_class( - # self.context, self.session, url, loop - # ) - # return self._hb_channel - wait_for_ready = KernelClient._async_wait_for_ready # The classes to use for the various channels @@ -45,7 +33,7 @@ class AsyncKernelClient(KernelClient): control_channel_class = Type(ZMQSocketChannel) - _recv_reply = KernelClient._async__recv_reply + _recv_reply = KernelClient._async_recv_reply # replies come on the shell channel @@ -55,14 +43,10 @@ class AsyncKernelClient(KernelClient): inspect = reqrep(KernelClient._async_inspect) kernel_info = reqrep(KernelClient._async_kernel_info) comm_info = reqrep(KernelClient._async_comm_info) - - # replies come on the control channel - shutdown = reqrep(KernelClient._async_shutdown, channel='control') - is_alive = KernelClient._async_is_alive - execute_interactive = KernelClient._async_execute_interactive - stop_channels = KernelClient._async_stop_channels - channels_running = property(KernelClient._async_channels_running) + + # replies come on the control channel + shutdown = reqrep(KernelClient._async_shutdown, channel='control') diff --git a/jupyter_client/blocking/client.py b/jupyter_client/blocking/client.py index ef92d2961..0eee0bcce 100644 --- a/jupyter_client/blocking/client.py +++ b/jupyter_client/blocking/client.py @@ -37,7 +37,7 @@ class BlockingKernelClient(KernelClient): control_channel_class = Type(ZMQSocketChannel) - _recv_reply = run_sync(KernelClient._async__recv_reply) + _recv_reply = run_sync(KernelClient._async_recv_reply) # replies come on the shell channel @@ -47,14 +47,10 @@ class BlockingKernelClient(KernelClient): inspect = run_sync(reqrep(KernelClient._async_inspect)) kernel_info = run_sync(reqrep(KernelClient._async_kernel_info)) comm_info = run_sync(reqrep(KernelClient._async_comm_info)) - - # replies come on the control channel - shutdown = run_sync(reqrep(KernelClient._async_shutdown, channel='control')) - is_alive = run_sync(KernelClient._async_is_alive) - execute_interactive = run_sync(KernelClient._async_execute_interactive) - stop_channels = run_sync(KernelClient._async_stop_channels) - channels_running = property(run_sync(KernelClient._async_channels_running)) + + # replies come on the control channel + shutdown = run_sync(reqrep(KernelClient._async_shutdown, channel='control')) diff --git a/jupyter_client/channels.py b/jupyter_client/channels.py index 5c14f627f..c5ff3791a 100644 --- a/jupyter_client/channels.py +++ b/jupyter_client/channels.py @@ -47,7 +47,7 @@ class HBChannel(Thread): _pause = None _beating = None - def __init__(self, context=None, session=None, address=None, loop=None): + def __init__(self, context=None, session=None, address=None): """Create the heartbeat monitor thread. Parameters @@ -62,8 +62,6 @@ def __init__(self, context=None, session=None, address=None, loop=None): super().__init__() self.daemon = True - self.loop = loop - self.context = context self.session = session if isinstance(address, tuple): @@ -93,6 +91,12 @@ def _create_socket(self): # close previous socket, before opening a new one self.poller.unregister(self.socket) self.socket.close() + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + self.socket = self.context.socket(zmq.REQ) self.socket.linger = 1000 self.socket.connect(self.address) @@ -134,8 +138,6 @@ def _poll(self, start_time): def run(self): """The thread's main activity. Call start() instead.""" - if self.loop is not None: - asyncio.set_event_loop(self.loop) self._create_socket() self._running = True self._beating = True diff --git a/jupyter_client/client.py b/jupyter_client/client.py index 84652dc3e..48969aaaa 100644 --- a/jupyter_client/client.py +++ b/jupyter_client/client.py @@ -51,7 +51,7 @@ async def wrapped(self, *args, **kwargs): if not reply: return msg_id - return await self._async__recv_reply(msg_id, timeout=timeout, channel=channel) + return await self._async_recv_reply(msg_id, timeout=timeout, channel=channel) if not meth.__doc__: # python -OO removes docstrings, @@ -199,7 +199,7 @@ async def _async_wait_for_ready(self, timeout=None): except Empty: break - async def _async__recv_reply(self, msg_id, timeout=None, channel='shell'): + async def _async_recv_reply(self, msg_id, timeout=None, channel='shell'): """Receive and return the reply for a given request""" if timeout is not None: deadline = time.monotonic() + timeout @@ -359,23 +359,11 @@ def hb_channel(self): if self._hb_channel is None: url = self._make_url('hb') self.log.debug("connecting heartbeat channel to %s", url) - loop = asyncio.new_event_loop() self._hb_channel = self.hb_channel_class( - self.context, self.session, url, loop + self.context, self.session, url ) return self._hb_channel - #@property - #def hb_channel(self): - # """Get the hb channel object for this kernel.""" - # if self._hb_channel is None: - # url = self._make_url('hb') - # self.log.debug("connecting heartbeat channel to %s", url) - # self._hb_channel = self.hb_channel_class( - # self.context, self.session, url - # ) - # return self._hb_channel - @property def control_channel(self): """Get the control channel object for this kernel.""" @@ -540,7 +528,7 @@ async def _async_execute_interactive(self, code, silent=False, store_history=Tru # output is done, get the reply if timeout is not None: timeout = max(0, deadline - time.monotonic()) - return await self._async__recv_reply(msg_id, timeout=timeout) + return await self._async_recv_reply(msg_id, timeout=timeout) # Methods to send specific messages on channels diff --git a/jupyter_client/manager.py b/jupyter_client/manager.py index a66ecd576..1577ccde1 100644 --- a/jupyter_client/manager.py +++ b/jupyter_client/manager.py @@ -23,7 +23,7 @@ Any, Float, Instance, Unicode, List, Bool, Type, DottedObjectName, default, observe, observe_compat ) -from traitlets.utils.importstring import import_item +from traitlets.utils.importstring import import_item # type: ignore from jupyter_client import ( launch_kernel, kernelspec, @@ -33,7 +33,7 @@ from .managerabc import ( KernelManagerABC ) -from .util import run_sync +from .util import run_sync, ensure_async class _ShutdownStatus(Enum): """ @@ -272,7 +272,7 @@ def from_ns(match): return [pat.sub(from_ns, arg) for arg in cmd] - async def _async__launch_kernel( + async def _async_launch_kernel( self, kernel_cmd: t.List[str], **kw @@ -283,7 +283,7 @@ async def _async__launch_kernel( """ return launch_kernel(kernel_cmd, **kw) - _launch_kernel = run_sync(_async__launch_kernel) + _launch_kernel = run_sync(_async_launch_kernel) # Control socket used for polite kernel shutdown @@ -380,7 +380,7 @@ async def _async_start_kernel(self, **kw): # launch the kernel subprocess self.log.debug("Starting kernel: %s", kernel_cmd) - self.kernel = await self._async__launch_kernel(kernel_cmd, **kw) + self.kernel = await ensure_async(self._launch_kernel(kernel_cmd, **kw)) self.post_start_kernel(**kw) start_kernel = run_sync(_async_start_kernel) @@ -417,7 +417,7 @@ async def _async_finish_shutdown( except asyncio.TimeoutError: self.log.debug("Kernel is taking too long to finish, terminating") self._shutdown_status = _ShutdownStatus.SigtermRequest - await self._async__send_kernel_sigterm() + await self._async_send_kernel_sigterm() try: await asyncio.wait_for( @@ -426,7 +426,7 @@ async def _async_finish_shutdown( except asyncio.TimeoutError: self.log.debug("Kernel is taking too long to finish, killing") self._shutdown_status = _ShutdownStatus.SigkillRequest - await self._async__kill_kernel() + await ensure_async(self._kill_kernel()) else: # Process is no longer alive, wait and clear if self.kernel is not None: @@ -485,16 +485,16 @@ async def _async_shutdown_kernel( # Stop monitoring for restarting while we shutdown. self.stop_restarter() - await self._async_interrupt_kernel() + await ensure_async(self.interrupt_kernel()) if now: - await self._async__kill_kernel() + await ensure_async(self._kill_kernel()) else: self.request_shutdown(restart=restart) # Don't send any additional kernel kill messages immediately, to give # the kernel a chance to properly execute shutdown actions. Wait for at # most 1s, checking every 0.1s. - await self._async_finish_shutdown() + await ensure_async(self.finish_shutdown()) # In 6.1.5, a new method, cleanup_resources(), was introduced to address # a leak issue (https://github.com/jupyter/jupyter_client/pull/548) and @@ -554,14 +554,14 @@ async def _async_restart_kernel( "No previous call to 'start_kernel'.") else: # Stop currently running kernel. - await self._async_shutdown_kernel(now=now, restart=True) + await ensure_async(self.shutdown_kernel(now=now, restart=True)) if newports: self.cleanup_random_ports() # Start new kernel. self._launch_args.update(kw) - await self._async_start_kernel(**self._launch_args) + await ensure_async(self.start_kernel(**self._launch_args)) restart_kernel = run_sync(_async_restart_kernel) @@ -570,7 +570,7 @@ def has_kernel(self) -> bool: """Has a kernel been started that we are managing.""" return self.kernel is not None - async def _async__send_kernel_sigterm(self) -> None: + async def _async_send_kernel_sigterm(self) -> None: """similar to _kill_kernel, but with sigterm (not sigkill), but do not block""" if self.has_kernel: # Signal the kernel to terminate (sends SIGTERM on Unix and @@ -600,9 +600,9 @@ async def _async__send_kernel_sigterm(self) -> None: if e.errno != ESRCH: raise - _send_kernel_sigterm = run_sync(_async__send_kernel_sigterm) + _send_kernel_sigterm = run_sync(_async_send_kernel_sigterm) - async def _async__kill_kernel(self) -> None: + async def _async_kill_kernel(self) -> None: """Kill the running kernel. This is a private method, callers should use shutdown_kernel(now=True). @@ -641,7 +641,7 @@ async def _async__kill_kernel(self) -> None: self.kernel.wait() self.kernel = None - _kill_kernel = run_sync(_async__kill_kernel) + _kill_kernel = run_sync(_async_kill_kernel) async def _async_interrupt_kernel(self) -> None: """Interrupts the kernel by sending it a signal. @@ -723,13 +723,13 @@ class AsyncKernelManager(KernelManager): client_class: DottedObjectName = DottedObjectName('jupyter_client.asynchronous.AsyncKernelClient') client_factory: Type = Type(klass='jupyter_client.asynchronous.AsyncKernelClient') - _launch_kernel = KernelManager._async__launch_kernel + _launch_kernel = KernelManager._async_launch_kernel start_kernel = KernelManager._async_start_kernel finish_shutdown = KernelManager._async_finish_shutdown shutdown_kernel = KernelManager._async_shutdown_kernel restart_kernel = KernelManager._async_restart_kernel - _send_kernel_sigterm = KernelManager._async__send_kernel_sigterm - _kill_kernel = KernelManager._async__kill_kernel + _send_kernel_sigterm = KernelManager._async_send_kernel_sigterm + _kill_kernel = KernelManager._async_kill_kernel interrupt_kernel = KernelManager._async_interrupt_kernel signal_kernel = KernelManager._async_signal_kernel is_alive = KernelManager._async_is_alive diff --git a/jupyter_client/tests/test_kernelmanager.py b/jupyter_client/tests/test_kernelmanager.py index 9d20786f9..adf0b6456 100644 --- a/jupyter_client/tests/test_kernelmanager.py +++ b/jupyter_client/tests/test_kernelmanager.py @@ -10,10 +10,10 @@ import signal import sys import time -import threading -import multiprocessing as mp +import concurrent.futures import pytest +import nest_asyncio from async_generator import async_generator, yield_ from traitlets.config.loader import Config from jupyter_core import paths @@ -349,41 +349,32 @@ def test_start_parallel_thread_kernels(self, config, install_kernel): pytest.skip("IPC transport is currently not working for this test!") self._run_signaltest_lifecycle(config) - thread = threading.Thread(target=self._run_signaltest_lifecycle, args=(config,)) - thread2 = threading.Thread(target=self._run_signaltest_lifecycle, args=(config,)) - try: - thread.start() - thread2.start() - finally: - thread.join() - thread2.join() + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as thread_executor: + future1 = thread_executor.submit(self._run_signaltest_lifecycle, config) + future2 = thread_executor.submit(self._run_signaltest_lifecycle, config) + future1.result() + future2.result() @pytest.mark.timeout(TIMEOUT) def test_start_parallel_process_kernels(self, config, install_kernel): if config.KernelManager.transport == 'ipc': # FIXME pytest.skip("IPC transport is currently not working for this test!") self._run_signaltest_lifecycle(config) - thread = threading.Thread(target=self._run_signaltest_lifecycle, args=(config,)) - proc = mp.Process(target=self._run_signaltest_lifecycle, args=(config,)) - try: - thread.start() - proc.start() - finally: - thread.join() - proc.join() - - assert proc.exitcode == 0 + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread_executor: + future1 = thread_executor.submit(self._run_signaltest_lifecycle, config) + with concurrent.futures.ProcessPoolExecutor(max_workers=1) as process_executor: + future2 = process_executor.submit(self._run_signaltest_lifecycle, config) + future2.result() + future1.result() @pytest.mark.timeout(TIMEOUT) def test_start_sequence_process_kernels(self, config, install_kernel): + if config.KernelManager.transport == 'ipc': # FIXME + pytest.skip("IPC transport is currently not working for this test!") self._run_signaltest_lifecycle(config) - proc = mp.Process(target=self._run_signaltest_lifecycle, args=(config,)) - try: - proc.start() - finally: - proc.join() - - assert proc.exitcode == 0 + with concurrent.futures.ProcessPoolExecutor(max_workers=1) as pool_executor: + future = pool_executor.submit(self._run_signaltest_lifecycle, config) + future.result() def _prepare_kernel(self, km, startup_timeout=TIMEOUT, **kwargs): km.start_kernel(**kwargs) diff --git a/jupyter_client/util.py b/jupyter_client/util.py index bfbb8e968..d811009ec 100644 --- a/jupyter_client/util.py +++ b/jupyter_client/util.py @@ -3,10 +3,13 @@ import nest_asyncio nest_asyncio.apply() -loop = asyncio.get_event_loop() - def run_sync(coro): def wrapped(*args, **kwargs): + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) return loop.run_until_complete(coro(*args, **kwargs)) wrapped.__doc__ = coro.__doc__ return wrapped