From 38168da2004d3ddaea717bbaafcb7293bd5faa94 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Tue, 9 Mar 2021 17:24:38 +0100 Subject: [PATCH] Refactor BlockingKernelManager/AsyncKernelManager --- jupyter_client/__init__.py | 2 +- jupyter_client/client.py | 16 +- jupyter_client/consoleapp.py | 6 +- jupyter_client/kernelapp.py | 8 +- jupyter_client/manager.py | 466 +++++------------- jupyter_client/tests/test_kernelmanager.py | 10 +- .../tests/test_multikernelmanager.py | 2 +- jupyter_client/tests/test_public_api.py | 4 +- jupyter_client/util.py | 10 + 9 files changed, 153 insertions(+), 371 deletions(-) create mode 100644 jupyter_client/util.py diff --git a/jupyter_client/__init__.py b/jupyter_client/__init__.py index f72c516d3..122010421 100644 --- a/jupyter_client/__init__.py +++ b/jupyter_client/__init__.py @@ -4,7 +4,7 @@ from .connect import * from .launcher import * from .client import KernelClient -from .manager import KernelManager, AsyncKernelManager, run_kernel +from .manager import KernelManager, BlockingKernelManager, AsyncKernelManager, run_kernel from .blocking import BlockingKernelClient from .asynchronous import AsyncKernelClient from .multikernelmanager import MultiKernelManager, AsyncMultiKernelManager diff --git a/jupyter_client/client.py b/jupyter_client/client.py index 760ac5266..7c8028680 100644 --- a/jupyter_client/client.py +++ b/jupyter_client/client.py @@ -3,11 +3,13 @@ # Copyright (c) Jupyter Development Team. # Distributed under the terms of the Modified BSD License. +import typing as t + from jupyter_client.channels import major_protocol_version import zmq -from traitlets import ( +from traitlets import ( # type: ignore Any, Instance, Type, ) @@ -19,11 +21,13 @@ # some utilities to validate message structure, these might get moved elsewhere # if they prove to have more generic utility -def validate_string_dict(dct): +def validate_string_dict( + dct: t.Dict[str, str] +) -> None: """Validate that the input is a dict with string keys and values. Raises ValueError if not.""" - for k,v in dct.items(): + for k, v in dct.items(): if not isinstance(k, str): raise ValueError('key %r in dict must be a string' % k) if not isinstance(v, str): @@ -49,7 +53,7 @@ class KernelClient(ConnectionFileMixin): # The PyZMQ Context to use for communication with the kernel. context = Instance(zmq.Context) - def _context_default(self): + def _context_default(self) -> zmq.Context: return zmq.Context() # The classes to use for the various channels @@ -67,13 +71,13 @@ def _context_default(self): _control_channel = Any() # flag for whether execute requests should be allowed to call raw_input: - allow_stdin = True + allow_stdin: bool = True #-------------------------------------------------------------------------- # Channel proxy methods #-------------------------------------------------------------------------- - def get_shell_msg(self, *args, **kwargs): + def get_shell_msg(self, *args, **kwargs) -> None: """Get a message from the shell channel""" return self.shell_channel.get_msg(*args, **kwargs) diff --git a/jupyter_client/consoleapp.py b/jupyter_client/consoleapp.py index 033b1fe2d..9fffdb48e 100644 --- a/jupyter_client/consoleapp.py +++ b/jupyter_client/consoleapp.py @@ -25,7 +25,7 @@ from .blocking import BlockingKernelClient from .restarter import KernelRestarter -from . import KernelManager, tunnel_to_kernel, find_connection_file, connect +from . import BlockingKernelManager, tunnel_to_kernel, find_connection_file, connect from .kernelspec import NoSuchKernel from .session import Session @@ -86,7 +86,7 @@ # Classes #----------------------------------------------------------------------------- -classes = [KernelManager, KernelRestarter, Session] +classes = [BlockingKernelManager, KernelRestarter, Session] class JupyterConsoleApp(ConnectionFileMixin): name = 'jupyter-console-mixin' @@ -112,7 +112,7 @@ class JupyterConsoleApp(ConnectionFileMixin): flags = Dict(flags) aliases = Dict(aliases) kernel_manager_class = Type( - default_value=KernelManager, + default_value=BlockingKernelManager, config=True, help='The kernel manager class to use.' ) diff --git a/jupyter_client/kernelapp.py b/jupyter_client/kernelapp.py index 33607049c..787cba9b1 100644 --- a/jupyter_client/kernelapp.py +++ b/jupyter_client/kernelapp.py @@ -8,7 +8,7 @@ from . import __version__ from .kernelspec import KernelSpecManager, NATIVE_KERNEL_NAME -from .manager import KernelManager +from .manager import BlockingKernelManager class KernelApp(JupyterApp): """Launch a kernel by name in a local subprocess. @@ -16,7 +16,7 @@ class KernelApp(JupyterApp): version = __version__ description = "Run a kernel locally in a subprocess" - classes = [KernelManager, KernelSpecManager] + classes = [BlockingKernelManager, KernelSpecManager] aliases = { 'kernel': 'KernelApp.kernel_name', @@ -33,8 +33,8 @@ def initialize(self, argv=None): cf_basename = 'kernel-%s.json' % uuid.uuid4() self.config.setdefault('KernelManager', {}).setdefault('connection_file', os.path.join(self.runtime_dir, cf_basename)) - self.km = KernelManager(kernel_name=self.kernel_name, - config=self.config) + self.km = BlockingKernelManager(kernel_name=self.kernel_name, + config=self.config) self.loop = IOLoop.current() self.loop.add_callback(self._record_started) diff --git a/jupyter_client/manager.py b/jupyter_client/manager.py index c1c0bca13..66d6995b8 100644 --- a/jupyter_client/manager.py +++ b/jupyter_client/manager.py @@ -12,7 +12,7 @@ import time import warnings from subprocess import Popen -import typing +import typing as t from enum import Enum @@ -33,6 +33,7 @@ from .managerabc import ( KernelManagerABC ) +from .util import run_sync class _ShutdownStatus(Enum): """ @@ -62,6 +63,8 @@ def __init__(self, *args, **kwargs): # The PyZMQ Context to use for communication with the kernel. context: Instance = Instance(zmq.Context) + + @default('context') def _context_default(self) -> zmq.Context: self._created_context = True return zmq.Context() @@ -69,14 +72,16 @@ def _context_default(self) -> zmq.Context: # the class to create with our `client` method client_class: DottedObjectName = DottedObjectName('jupyter_client.blocking.BlockingKernelClient') client_factory: Type = Type(klass='jupyter_client.KernelClient') + + @default('client_factory') def _client_factory_default(self) -> Type: return import_item(self.client_class) @observe('client_class') def _client_class_changed( self, - change: typing.Dict[str, DottedObjectName] - ) -> None: + change: t.Dict[str, DottedObjectName] + ) -> None: self.client_factory = import_item(str(change['new'])) # The kernel process with which the KernelManager is communicating. @@ -85,6 +90,7 @@ def _client_class_changed( kernel_spec_manager: Instance = Instance(kernelspec.KernelSpecManager) + @default('kernel_spec_manager') def _kernel_spec_manager_default(self) -> kernelspec.KernelSpecManager: return kernelspec.KernelSpecManager(data_dir=self.data_dir) @@ -92,8 +98,8 @@ def _kernel_spec_manager_default(self) -> kernelspec.KernelSpecManager: @observe_compat def _kernel_spec_manager_changed( self, - change: typing.Dict[str, Instance] - ) -> None: + change: t.Dict[str, Instance] + ) -> None: self._kernel_spec = None shutdown_wait_time: Float = Float( @@ -112,16 +118,16 @@ def _kernel_spec_manager_changed( @observe('kernel_name') def _kernel_name_changed( self, - change: typing.Dict[str, Unicode] - ) -> None: + change: t.Dict[str, Unicode] + ) -> None: self._kernel_spec = None if change['new'] == 'python': self.kernel_name = kernelspec.NATIVE_KERNEL_NAME - _kernel_spec: typing.Optional[kernelspec.KernelSpec] = None + _kernel_spec: t.Optional[kernelspec.KernelSpec] = None @property - def kernel_spec(self) -> typing.Optional[kernelspec.KernelSpec]: + def kernel_spec(self) -> t.Optional[kernelspec.KernelSpec]: if self._kernel_spec is None and self.kernel_name != '': self._kernel_spec = self.kernel_spec_manager.get_kernel_spec(self.kernel_name) return self._kernel_spec @@ -182,9 +188,9 @@ def stop_restarter(self) -> None: def add_restart_callback( self, - callback: typing.Callable, + callback: t.Callable, event: str = 'restart' - ) -> None: + ) -> None: """register a callback to be called when a kernel is restarted""" if self._restarter is None: return @@ -192,7 +198,7 @@ def add_restart_callback( def remove_restart_callback( self, - callback: typing.Callable, + callback: t.Callable, event: str ='restart' ) -> None: """unregister a callback to be called when a kernel is restarted""" @@ -223,10 +229,11 @@ def client(self, **kwargs) -> KernelClient: def format_kernel_cmd( self, - extra_arguments: typing.Optional[typing.List[str]] = None - ) -> typing.List[str]: + extra_arguments: t.Optional[t.List[str]] = None + ) -> t.List[str]: """replace templated args (e.g. {connection_file})""" extra_arguments = extra_arguments or [] + self.log.info(str(self.kernel_spec)) if self.kernel_cmd: cmd = self.kernel_cmd + extra_arguments else: @@ -265,17 +272,19 @@ def from_ns(match): return [pat.sub(from_ns, arg) for arg in cmd] - def _launch_kernel( + async def _async__launch_kernel( self, - kernel_cmd: typing.List[str], + kernel_cmd: t.List[str], **kw - ) -> typing.Union[Popen, typing.Coroutine[typing.Any, typing.Any, Popen]]: + ) -> Popen: """actually launch the kernel override in a subclass to launch kernel subprocesses differently """ return launch_kernel(kernel_cmd, **kw) + _launch_kernel = _async__launch_kernel + # Control socket used for polite kernel shutdown def _connect_control_socket(self) -> None: @@ -289,7 +298,7 @@ def _close_control_socket(self) -> None: self._control_socket.close() self._control_socket = None - def pre_start_kernel(self, **kw) -> typing.Tuple[typing.List[str], typing.Dict[str, typing.Any]]: + def pre_start_kernel(self, **kw) -> t.Tuple[t.List[str], t.Dict[str, t.Any]]: """Prepares a kernel for startup in a separate process. If random ports (port=0) are being used, this method must be called @@ -333,9 +342,9 @@ def pre_start_kernel(self, **kw) -> typing.Tuple[typing.List[str], typing.Dict[s def _get_env_substitutions( self, - templated_env: typing.Optional[typing.Dict[str, str]], - substitution_values: typing.Dict[str, str] - ) -> typing.Optional[typing.Dict[str, str]]: + templated_env: t.Optional[t.Dict[str, str]], + substitution_values: t.Dict[str, str] + ) -> t.Optional[t.Dict[str, str]]: """ Walks env entries in templated_env and applies possible substitutions from current env (represented by substitution_values). Returns the substituted list of env entries. @@ -355,7 +364,7 @@ def post_start_kernel(self, **kw) -> None: self.start_restarter() self._connect_control_socket() - def start_kernel(self, **kw): + async def _async_start_kernel(self, **kw): """Starts a kernel on this host in a separate process. If random ports (port=0) are being used, this method must be called @@ -371,13 +380,15 @@ def start_kernel(self, **kw): # launch the kernel subprocess self.log.debug("Starting kernel: %s", kernel_cmd) - self.kernel = self._launch_kernel(kernel_cmd, **kw) + self.kernel = await self._async__launch_kernel(kernel_cmd, **kw) self.post_start_kernel(**kw) + start_kernel = _async_start_kernel + def request_shutdown( self, restart: bool = False - ) -> None: + ) -> None: """Send a shutdown request via control channel """ content = dict(restart=restart) @@ -386,11 +397,11 @@ def request_shutdown( self._connect_control_socket() self.session.send(self._control_socket, msg) - def finish_shutdown( + async def _async_finish_shutdown( self, - waittime: typing.Optional[float] = None, + waittime: t.Optional[float] = None, pollinterval: float = 0.1 - ) -> None: + ) -> None: """Wait for kernel shutdown, then kill process if it doesn't shutdown. This does not send shutdown requests - use :meth:`request_shutdown` @@ -399,49 +410,35 @@ def finish_shutdown( if waittime is None: waittime = max(self.shutdown_wait_time, 0) self._shutdown_status = _ShutdownStatus.ShutdownRequest - - def poll_or_sleep_to_kernel_gone(): - """ - Poll until the kernel is not responding, - then wait (the subprocess), until process gone. - - After this function the kernel is either: - - still responding; or - - subprocess has been culled. - """ - if self.is_alive(): - time.sleep(pollinterval) - else: - # If there's still a proc, wait and clear - if self.has_kernel: - self.kernel.wait() - self.kernel = None - return True - - # wait 50% of the shutdown timeout... - for i in range(int(waittime / 2 / pollinterval)): - if poll_or_sleep_to_kernel_gone(): - break - else: - # if we've exited the loop normally (no break) - # send sigterm and wait the other 50%. + try: + await asyncio.wait_for( + self._async_wait(pollinterval=pollinterval), timeout=waittime / 2 + ) + except asyncio.TimeoutError: self.log.debug("Kernel is taking too long to finish, terminating") self._shutdown_status = _ShutdownStatus.SigtermRequest - self._send_kernel_sigterm() - for i in range(int(waittime / 2 / pollinterval)): - if poll_or_sleep_to_kernel_gone(): - break - else: - # OK, we've waited long enough. - if self.has_kernel: - self.log.debug("Kernel is taking too long to finish, killing") - self._shutdown_status = _ShutdownStatus.SigkillRequest - self._kill_kernel() + await self._async__send_kernel_sigterm() + + try: + await asyncio.wait_for( + self._async_wait(pollinterval=pollinterval), timeout=waittime / 2 + ) + except asyncio.TimeoutError: + self.log.debug("Kernel is taking too long to finish, killing") + self._shutdown_status = _ShutdownStatus.SigkillRequest + await self._async__kill_kernel() + else: + # Process is no longer alive, wait and clear + if self.kernel is not None: + self.kernel.wait() + self.kernel = None + + finish_shutdown = _async_finish_shutdown def cleanup_resources( self, restart: bool = False - ) -> None: + ) -> None: """Clean up resources when the kernel is shut down""" if not restart: self.cleanup_connection_file() @@ -456,13 +453,13 @@ def cleanup_resources( def cleanup( self, connection_file: bool = True - ) -> None: + ) -> None: """Clean up resources when the kernel is shut down""" warnings.warn("Method cleanup(connection_file=True) is deprecated, use cleanup_resources(restart=False).", FutureWarning) self.cleanup_resources(restart=not connection_file) - def shutdown_kernel( + async def _async_shutdown_kernel( self, now: bool = False, restart: bool = False @@ -488,16 +485,16 @@ def shutdown_kernel( # Stop monitoring for restarting while we shutdown. self.stop_restarter() - self.interrupt_kernel() + await self._async_interrupt_kernel() if now: - self._kill_kernel() + await self._async__kill_kernel() else: self.request_shutdown(restart=restart) # Don't send any additional kernel kill messages immediately, to give # the kernel a chance to properly execute shutdown actions. Wait for at # most 1s, checking every 0.1s. - self.finish_shutdown() + await self._async_finish_shutdown() # In 6.1.5, a new method, cleanup_resources(), was introduced to address # a leak issue (https://github.com/jupyter/jupyter_client/pull/548) and @@ -520,12 +517,14 @@ def shutdown_kernel( else: self.cleanup_resources(restart=restart) - def restart_kernel( + shutdown_kernel = _async_shutdown_kernel + + async def _async_restart_kernel( self, now: bool = False, newports: bool = False, **kw - ) -> None: + ) -> None: """Restarts a kernel with the arguments that were used to launch it. Parameters @@ -555,21 +554,23 @@ def restart_kernel( "No previous call to 'start_kernel'.") else: # Stop currently running kernel. - self.shutdown_kernel(now=now, restart=True) + await self._async_shutdown_kernel(now=now, restart=True) if newports: self.cleanup_random_ports() # Start new kernel. self._launch_args.update(kw) - self.start_kernel(**self._launch_args) + await self._async_start_kernel(**self._launch_args) + + restart_kernel = _async_restart_kernel @property def has_kernel(self) -> bool: """Has a kernel been started that we are managing.""" return self.kernel is not None - def _send_kernel_sigterm(self) -> None: + async def _async__send_kernel_sigterm(self) -> None: """similar to _kill_kernel, but with sigterm (not sigkill), but do not block""" if self.has_kernel: # Signal the kernel to terminate (sends SIGTERM on Unix and @@ -579,7 +580,7 @@ def _send_kernel_sigterm(self) -> None: if hasattr(self.kernel, "terminate"): self.kernel.terminate() elif hasattr(signal, "SIGTERM"): - self.signal_kernel(signal.SIGTERM) + await self._async_signal_kernel(signal.SIGTERM) else: self.log.debug( "Cannot set term signal to kernel, no" @@ -599,271 +600,9 @@ def _send_kernel_sigterm(self) -> None: if e.errno != ESRCH: raise - def _kill_kernel(self) -> None: - """Kill the running kernel. - - This is a private method, callers should use shutdown_kernel(now=True). - """ - if self.has_kernel: - # Signal the kernel to terminate (sends SIGKILL on Unix and calls - # TerminateProcess() on Win32). - try: - if hasattr(signal, 'SIGKILL'): - self.signal_kernel(signal.SIGKILL) # type: ignore - else: - self.kernel.kill() - except OSError as e: - # In Windows, we will get an Access Denied error if the process - # has already terminated. Ignore it. - if sys.platform == 'win32': - if e.winerror != 5: # type: ignore - raise - # On Unix, we may get an ESRCH error if the process has already - # terminated. Ignore it. - else: - from errno import ESRCH - if e.errno != ESRCH: - raise - - # Block until the kernel terminates. - self.kernel.wait() - self.kernel = None - - def interrupt_kernel(self) -> None: - """Interrupts the kernel by sending it a signal. - - Unlike ``signal_kernel``, this operation is well supported on all - platforms. - """ - if self.has_kernel: - assert self.kernel_spec is not None - interrupt_mode = self.kernel_spec.interrupt_mode - if interrupt_mode == 'signal': - if sys.platform == 'win32': - from .win_interrupt import send_interrupt - send_interrupt(self.kernel.win32_interrupt_event) - else: - self.signal_kernel(signal.SIGINT) - - elif interrupt_mode == 'message': - msg = self.session.msg("interrupt_request", content={}) - self._connect_control_socket() - self.session.send(self._control_socket, msg) - else: - raise RuntimeError("Cannot interrupt kernel. No kernel is running!") - - def signal_kernel( - self, - signum: int - ) -> None: - """Sends a signal to the process group of the kernel (this - usually includes the kernel and any subprocesses spawned by - the kernel). - - Note that since only SIGTERM is supported on Windows, this function is - only useful on Unix systems. - """ - if self.has_kernel: - if hasattr(os, "getpgid") and hasattr(os, "killpg"): - try: - pgid = os.getpgid(self.kernel.pid) # type: ignore - os.killpg(pgid, signum) # type: ignore - return - except OSError: - pass - self.kernel.send_signal(signum) - else: - raise RuntimeError("Cannot signal kernel. No kernel is running!") - - def is_alive(self) -> bool: - """Is the kernel process still running?""" - if self.has_kernel: - if self.kernel.poll() is None: - return True - else: - return False - else: - # we don't have a kernel - return False - - -class AsyncKernelManager(KernelManager): - """Manages kernels in an asynchronous manner """ - - client_class: DottedObjectName = DottedObjectName('jupyter_client.asynchronous.AsyncKernelClient') - client_factory: Type = Type(klass='jupyter_client.asynchronous.AsyncKernelClient') - - async def _launch_kernel(self, kernel_cmd, **kw): - """actually launch the kernel - - override in a subclass to launch kernel subprocesses differently - """ - return launch_kernel(kernel_cmd, **kw) - - async def start_kernel(self, **kw): - """Starts a kernel in a separate process in an asynchronous manner. - - If random ports (port=0) are being used, this method must be called - before the channels are created. - - Parameters - ---------- - `**kw` : optional - keyword arguments that are passed down to build the kernel_cmd - and launching the kernel (e.g. Popen kwargs). - """ - kernel_cmd, kw = self.pre_start_kernel(**kw) - - # launch the kernel subprocess - self.log.debug("Starting kernel (async): %s", kernel_cmd) - self.kernel = await self._launch_kernel(kernel_cmd, **kw) - self.post_start_kernel(**kw) - - async def finish_shutdown(self, waittime=None, pollinterval=0.1): - """Wait for kernel shutdown, then kill process if it doesn't shutdown. - - This does not send shutdown requests - use :meth:`request_shutdown` - first. - """ - if waittime is None: - waittime = max(self.shutdown_wait_time, 0) - self._shutdown_status = _ShutdownStatus.ShutdownRequest - try: - await asyncio.wait_for( - self._async_wait(pollinterval=pollinterval), timeout=waittime / 2 - ) - except asyncio.TimeoutError: - self.log.debug("Kernel is taking too long to finish, terminating") - self._shutdown_status = _ShutdownStatus.SigtermRequest - await self._send_kernel_sigterm() - - try: - await asyncio.wait_for( - self._async_wait(pollinterval=pollinterval), timeout=waittime / 2 - ) - except asyncio.TimeoutError: - self.log.debug("Kernel is taking too long to finish, killing") - self._shutdown_status = _ShutdownStatus.SigkillRequest - await self._kill_kernel() - else: - # Process is no longer alive, wait and clear - if self.kernel is not None: - self.kernel.wait() - self.kernel = None - - async def shutdown_kernel(self, now=False, restart=False): - """Attempts to stop the kernel process cleanly. - - This attempts to shutdown the kernels cleanly by: - - 1. Sending it a shutdown message over the shell channel. - 2. If that fails, the kernel is shutdown forcibly by sending it - a signal. - - Parameters - ---------- - now : bool - Should the kernel be forcible killed *now*. This skips the - first, nice shutdown attempt. - restart: bool - Will this kernel be restarted after it is shutdown. When this - is True, connection files will not be cleaned up. - """ - self.shutting_down = True # Used by restarter to prevent race condition - # Stop monitoring for restarting while we shutdown. - self.stop_restarter() - - await self.interrupt_kernel() - - if now: - await self._kill_kernel() - else: - self.request_shutdown(restart=restart) - # Don't send any additional kernel kill messages immediately, to give - # the kernel a chance to properly execute shutdown actions. Wait for at - # most 1s, checking every 0.1s. - await self.finish_shutdown() - - # See comment in KernelManager.shutdown_kernel(). - overrides_cleanup = type(self).cleanup is not AsyncKernelManager.cleanup - overrides_cleanup_resources = type(self).cleanup_resources is not AsyncKernelManager.cleanup_resources - - if overrides_cleanup and not overrides_cleanup_resources: - self.cleanup(connection_file=not restart) - else: - self.cleanup_resources(restart=restart) - - async def restart_kernel(self, now=False, newports=False, **kw): - """Restarts a kernel with the arguments that were used to launch it. - - Parameters - ---------- - now : bool, optional - If True, the kernel is forcefully restarted *immediately*, without - having a chance to do any cleanup action. Otherwise the kernel is - given 1s to clean up before a forceful restart is issued. - - In all cases the kernel is restarted, the only difference is whether - it is given a chance to perform a clean shutdown or not. - - newports : bool, optional - If the old kernel was launched with random ports, this flag decides - whether the same ports and connection file will be used again. - If False, the same ports and connection file are used. This is - the default. If True, new random port numbers are chosen and a - new connection file is written. It is still possible that the newly - chosen random port numbers happen to be the same as the old ones. - - `**kw` : optional - Any options specified here will overwrite those used to launch the - kernel. - """ - if self._launch_args is None: - raise RuntimeError("Cannot restart the kernel. " - "No previous call to 'start_kernel'.") - else: - # Stop currently running kernel. - await self.shutdown_kernel(now=now, restart=True) - - if newports: - self.cleanup_random_ports() - - # Start new kernel. - self._launch_args.update(kw) - await self.start_kernel(**self._launch_args) - return None - - async def _send_kernel_sigterm(self): - """similar to _kill_kernel, but with sigterm (not sigkill), but do not block""" - if self.has_kernel: - # Signal the kernel to terminate (sends SIGTERM on Unix and - # if the kernel is a subprocess and we are on windows; this is - # equivalent to kill - try: - if hasattr(self.kernel, "terminate"): - self.kernel.terminate() - elif hasattr(signal, "SIGTERM"): - await self.signal_kernel(signal.SIGTERM) - else: - self.log.debug( - "Cannot set term signal to kernel, no" - " `.terminate()` method and no values for SIGTERM" - ) - except OSError as e: - # In Windows, we will get an Access Denied error if the process - # has already terminated. Ignore it. - if sys.platform == "win32": - if e.winerror != 5: - raise - # On Unix, we may get an ESRCH error if the process has already - # terminated. Ignore it. - else: - from errno import ESRCH - - if e.errno != ESRCH: - raise + _send_kernel_sigterm = _async__send_kernel_sigterm - async def _kill_kernel(self): + async def _async__kill_kernel(self) -> None: """Kill the running kernel. This is a private method, callers should use shutdown_kernel(now=True). @@ -873,7 +612,7 @@ async def _kill_kernel(self): # TerminateProcess() on Win32). try: if hasattr(signal, 'SIGKILL'): - await self.signal_kernel(signal.SIGKILL) + await self._async_signal_kernel(signal.SIGKILL) else: self.kernel.kill() except OSError as e: @@ -902,7 +641,9 @@ async def _kill_kernel(self): self.kernel.wait() self.kernel = None - async def interrupt_kernel(self): + _kill_kernel = _async__kill_kernel + + async def _async_interrupt_kernel(self) -> None: """Interrupts the kernel by sending it a signal. Unlike ``signal_kernel``, this operation is well supported on all @@ -916,7 +657,7 @@ async def interrupt_kernel(self): from .win_interrupt import send_interrupt send_interrupt(self.kernel.win32_interrupt_event) else: - await self.signal_kernel(signal.SIGINT) + await self._async_signal_kernel(signal.SIGINT) elif interrupt_mode == 'message': msg = self.session.msg("interrupt_request", content={}) @@ -925,7 +666,12 @@ async def interrupt_kernel(self): else: raise RuntimeError("Cannot interrupt kernel. No kernel is running!") - async def signal_kernel(self, signum): + interrupt_kernel = _async_interrupt_kernel + + async def _async_signal_kernel( + self, + signum: int + ) -> None: """Sends a signal to the process group of the kernel (this usually includes the kernel and any subprocesses spawned by the kernel). @@ -945,7 +691,9 @@ async def signal_kernel(self, signum): else: raise RuntimeError("Cannot signal kernel. No kernel is running!") - async def is_alive(self): + signal_kernel = _async_signal_kernel + + async def _async_is_alive(self) -> bool: """Is the kernel process still running?""" if self.has_kernel: if self.kernel.poll() is None: @@ -956,6 +704,8 @@ async def is_alive(self): # we don't have a kernel return False + is_alive = _async_is_alive + async def _async_wait( self, pollinterval: float = 0.1 @@ -964,10 +714,28 @@ async def _async_wait( # not alive. If we find the process is no longer alive, complete # its cleanup via the blocking wait(). Callers are responsible for # issuing calls to wait() using a timeout (see _kill_kernel()). - while await self.is_alive(): + while await self._async_is_alive(): await asyncio.sleep(pollinterval) +class BlockingKernelManager(KernelManager): + _launch_kernel = run_sync(KernelManager._launch_kernel) + start_kernel = run_sync(KernelManager.start_kernel) + finish_shutdown = run_sync(KernelManager.finish_shutdown) + shutdown_kernel = run_sync(KernelManager.shutdown_kernel) + restart_kernel = run_sync(KernelManager.restart_kernel) + _send_kernel_sigterm = run_sync(KernelManager._send_kernel_sigterm) + _kill_kernel = run_sync(KernelManager._kill_kernel) + interrupt_kernel = run_sync(KernelManager.interrupt_kernel) + signal_kernel = run_sync(KernelManager.signal_kernel) + is_alive = run_sync(KernelManager.is_alive) + +class AsyncKernelManager(KernelManager): + # the class to create with our `client` method + client_class: DottedObjectName = DottedObjectName('jupyter_client.asynchronous.AsyncKernelClient') + client_factory: Type = Type(klass='jupyter_client.asynchronous.AsyncKernelClient') + + KernelManagerABC.register(KernelManager) @@ -975,9 +743,9 @@ def start_new_kernel( startup_timeout: float =60, kernel_name: str = 'python', **kwargs - ) -> typing.Tuple[KernelManager, KernelClient]: +) -> t.Tuple[BlockingKernelManager, KernelClient]: """Start a new kernel, and return its Manager and Client""" - km = KernelManager(kernel_name=kernel_name) + km = BlockingKernelManager(kernel_name=kernel_name) km.start_kernel(**kwargs) kc = km.client() kc.start_channels() @@ -995,7 +763,7 @@ async def start_new_async_kernel( startup_timeout: float = 60, kernel_name: str = 'python', **kwargs - ) -> typing.Tuple[KernelManager, KernelClient]: +) -> t.Tuple[AsyncKernelManager, KernelClient]: """Start a new kernel, and return its Manager and Client""" km = AsyncKernelManager(kernel_name=kernel_name) await km.start_kernel(**kwargs) @@ -1012,7 +780,7 @@ async def start_new_async_kernel( @contextmanager -def run_kernel(**kwargs) -> typing.Iterator[KernelClient]: +def run_kernel(**kwargs) -> t.Iterator[KernelClient]: """Context manager to create a kernel in a subprocess. The kernel is shut down when the context exits. diff --git a/jupyter_client/tests/test_kernelmanager.py b/jupyter_client/tests/test_kernelmanager.py index 7cb3876eb..e1654311b 100644 --- a/jupyter_client/tests/test_kernelmanager.py +++ b/jupyter_client/tests/test_kernelmanager.py @@ -17,7 +17,7 @@ from async_generator import async_generator, yield_ from traitlets.config.loader import Config from jupyter_core import paths -from jupyter_client import KernelManager, AsyncKernelManager +from jupyter_client import BlockingKernelManager, AsyncKernelManager from subprocess import PIPE from ..manager import start_new_kernel, start_new_async_kernel @@ -101,7 +101,7 @@ def start_kernel(): @pytest.fixture def km(config): - km = KernelManager(config=config) + km = BlockingKernelManager(config=config) return km @@ -186,7 +186,7 @@ def test_lifecycle(self, km): km.restart_kernel(now=True) assert km.is_alive() km.interrupt_kernel() - assert isinstance(km, KernelManager) + assert isinstance(km, BlockingKernelManager) km.shutdown_kernel(now=True) assert km.context.closed @@ -275,7 +275,7 @@ def test_cleanup_context(self, km): def test_no_cleanup_shared_context(self, zmq_context): """kernel manager does not terminate shared context""" - km = KernelManager(context=zmq_context) + km = BlockingKernelManager(context=zmq_context) assert km.context == zmq_context assert km.context is not None @@ -349,7 +349,7 @@ def _prepare_kernel(self, km, startup_timeout=TIMEOUT, **kwargs): return kc def _run_signaltest_lifecycle(self, config=None): - km = KernelManager(config=config, kernel_name='signaltest') + km = BlockingKernelManager(config=config, kernel_name='signaltest') kc = self._prepare_kernel(km, stdout=PIPE, stderr=PIPE) def execute(cmd): diff --git a/jupyter_client/tests/test_multikernelmanager.py b/jupyter_client/tests/test_multikernelmanager.py index 15393ae8a..51a0a196c 100644 --- a/jupyter_client/tests/test_multikernelmanager.py +++ b/jupyter_client/tests/test_multikernelmanager.py @@ -189,7 +189,7 @@ async def _run_lifecycle(km, test_kid=None): assert kid in km.list_kernel_ids() await km.interrupt_kernel(kid) k = km.get_kernel(kid) - assert isinstance(k, KernelManager) + assert isinstance(k, AsyncKernelManager) await km.shutdown_kernel(kid, now=True) assert kid not in km, f'{kid} not in {km}' diff --git a/jupyter_client/tests/test_public_api.py b/jupyter_client/tests/test_public_api.py index ab3883d66..d77679578 100644 --- a/jupyter_client/tests/test_public_api.py +++ b/jupyter_client/tests/test_public_api.py @@ -9,12 +9,12 @@ def test_kms(): - for base in ("", "Multi"): + for base in ("", "Blocking", "Async", "Multi"): KM = base + "KernelManager" assert KM in dir(jupyter_client) def test_kcs(): - for base in ("", "Blocking"): + for base in ("", "Blocking", "Async"): KM = base + "KernelClient" assert KM in dir(jupyter_client) diff --git a/jupyter_client/util.py b/jupyter_client/util.py new file mode 100644 index 000000000..e2465f1ac --- /dev/null +++ b/jupyter_client/util.py @@ -0,0 +1,10 @@ +import concurrent.futures +import asyncio + +def run_sync(coro): + def wrapped(*args, **kwargs): + with concurrent.futures.ThreadPoolExecutor() as executor: + future = executor.submit(asyncio.run, coro(*args, **kwargs)) + return future.result() + wrapped.__doc__ = coro.__doc__ + return wrapped