Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Contain usage of nest-asyncio #666

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions jupyter_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
from .connect import * # noqa
from .launcher import * # noqa
from .manager import AsyncKernelManager # noqa
from .manager import BlockingKernelManager # noqa
from .manager import KernelManager # noqa
from .manager import run_kernel # noqa
from .multikernelmanager import AsyncMultiKernelManager # noqa
from .multikernelmanager import BlockingMultiKernelManager # noqa
from .multikernelmanager import MultiKernelManager # noqa
14 changes: 7 additions & 7 deletions jupyter_client/ioloop/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from .restarter import AsyncIOLoopKernelRestarter
from .restarter import IOLoopKernelRestarter
from jupyter_client.manager import AsyncKernelManager
from jupyter_client.manager import KernelManager
from jupyter_client.manager import BlockingKernelManager


def as_zmqstream(f):
Expand All @@ -20,7 +20,7 @@ def wrapped(self, *args, **kwargs):
return wrapped


class IOLoopKernelManager(KernelManager):
class IOLoopKernelManager(BlockingKernelManager):

loop = Instance("tornado.ioloop.IOLoop")

Expand Down Expand Up @@ -52,11 +52,11 @@ def stop_restarter(self):
if self._restarter is not None:
self._restarter.stop()

connect_shell = as_zmqstream(KernelManager.connect_shell)
connect_control = as_zmqstream(KernelManager.connect_control)
connect_iopub = as_zmqstream(KernelManager.connect_iopub)
connect_stdin = as_zmqstream(KernelManager.connect_stdin)
connect_hb = as_zmqstream(KernelManager.connect_hb)
connect_shell = as_zmqstream(BlockingKernelManager.connect_shell)
connect_control = as_zmqstream(BlockingKernelManager.connect_control)
connect_iopub = as_zmqstream(BlockingKernelManager.connect_iopub)
connect_stdin = as_zmqstream(BlockingKernelManager.connect_stdin)
connect_hb = as_zmqstream(BlockingKernelManager.connect_hb)


class AsyncIOLoopKernelManager(AsyncKernelManager):
Expand Down
6 changes: 3 additions & 3 deletions jupyter_client/kernelapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from . import __version__
from .kernelspec import KernelSpecManager
from .kernelspec import NATIVE_KERNEL_NAME
from .manager import KernelManager
from .manager import BlockingKernelManager


class KernelApp(JupyterApp):
Expand All @@ -19,7 +19,7 @@ class KernelApp(JupyterApp):
version = __version__
description = "Run a kernel locally in a subprocess"

classes = [KernelManager, KernelSpecManager]
classes = [BlockingKernelManager, KernelSpecManager]

aliases = {
"kernel": "KernelApp.kernel_name",
Expand All @@ -38,7 +38,7 @@ def initialize(self, argv=None):
self.config.setdefault("KernelManager", {}).setdefault(
"connection_file", os.path.join(self.runtime_dir, cf_basename)
)
self.km = KernelManager(kernel_name=self.kernel_name, config=self.config)
self.km = BlockingKernelManager(kernel_name=self.kernel_name, config=self.config)

self.loop = IOLoop.current()
self.loop.add_callback(self._record_started)
Expand Down
58 changes: 27 additions & 31 deletions jupyter_client/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,7 @@ def _context_default(self) -> zmq.Context:
return zmq.Context()

# the class to create with our `client` method
client_class: DottedObjectName = DottedObjectName(
"jupyter_client.blocking.BlockingKernelClient"
)
client_class: DottedObjectName = DottedObjectName()
client_factory: Type = Type(klass="jupyter_client.KernelClient")

@default("client_factory")
Expand Down Expand Up @@ -249,8 +247,6 @@ async def _async_launch_kernel(self, kernel_cmd: t.List[str], **kw) -> Popen:
"""
return launch_kernel(kernel_cmd, **kw)

_launch_kernel = run_sync(_async_launch_kernel)

# Control socket used for polite kernel shutdown

def _connect_control_socket(self) -> None:
Expand Down Expand Up @@ -349,8 +345,6 @@ async def _async_start_kernel(self, **kw):
self.kernel = await ensure_async(self._launch_kernel(kernel_cmd, **kw))
self.post_start_kernel(**kw)

start_kernel = run_sync(_async_start_kernel)

def request_shutdown(self, restart: bool = False) -> None:
"""Send a shutdown request via control channel"""
content = dict(restart=restart)
Expand Down Expand Up @@ -394,8 +388,6 @@ async def _async_finish_shutdown(
await asyncio.sleep(pollinterval)
self.kernel = None

finish_shutdown = run_sync(_async_finish_shutdown)

def cleanup_resources(self, restart: bool = False) -> None:
"""Clean up resources when the kernel is shut down"""
if not restart:
Expand Down Expand Up @@ -443,8 +435,6 @@ async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False)

self.cleanup_resources(restart=restart)

shutdown_kernel = run_sync(_async_shutdown_kernel)

async def _async_restart_kernel(self, now: bool = False, newports: bool = False, **kw) -> None:
"""Restarts a kernel with the arguments that were used to launch it.

Expand Down Expand Up @@ -483,8 +473,6 @@ async def _async_restart_kernel(self, now: bool = False, newports: bool = False,
self._launch_args.update(kw)
await ensure_async(self.start_kernel(**self._launch_args))

restart_kernel = run_sync(_async_restart_kernel)

@property
def has_kernel(self) -> bool:
"""Has a kernel been started that we are managing."""
Expand Down Expand Up @@ -520,8 +508,6 @@ async def _async_send_kernel_sigterm(self) -> None:
if e.errno != ESRCH:
raise

_send_kernel_sigterm = run_sync(_async_send_kernel_sigterm)

async def _async_kill_kernel(self) -> None:
"""Kill the running kernel.

Expand Down Expand Up @@ -563,8 +549,6 @@ async def _async_kill_kernel(self) -> None:
await asyncio.sleep(0.1)
self.kernel = None

_kill_kernel = run_sync(_async_kill_kernel)

async def _async_interrupt_kernel(self) -> None:
"""Interrupts the kernel by sending it a signal.

Expand All @@ -589,8 +573,6 @@ async def _async_interrupt_kernel(self) -> None:
else:
raise RuntimeError("Cannot interrupt kernel. No kernel is running!")

interrupt_kernel = run_sync(_async_interrupt_kernel)

async def _async_signal_kernel(self, signum: int) -> None:
"""Sends a signal to the process group of the kernel (this
usually includes the kernel and any subprocesses spawned by
Expand All @@ -611,8 +593,6 @@ async def _async_signal_kernel(self, signum: int) -> None:
else:
raise RuntimeError("Cannot signal kernel. No kernel is running!")

signal_kernel = run_sync(_async_signal_kernel)

async def _async_is_alive(self) -> bool:
"""Is the kernel process still running?"""
if self.has_kernel:
Expand All @@ -624,8 +604,6 @@ async def _async_is_alive(self) -> bool:
# we don't have a kernel
return False

is_alive = run_sync(_async_is_alive)

async def _async_wait(self, pollinterval: float = 0.1) -> None:
# Use busy loop at 100ms intervals, polling until the process is
# not alive. If we find the process is no longer alive, complete
Expand All @@ -636,11 +614,9 @@ async def _async_wait(self, pollinterval: float = 0.1) -> None:


class AsyncKernelManager(KernelManager):
# the class to create with our `client` method
client_class: DottedObjectName = DottedObjectName(
"jupyter_client.asynchronous.AsyncKernelClient"
)
client_factory: Type = Type(klass="jupyter_client.asynchronous.AsyncKernelClient")
@default('client_class')
def _default_client_class(self):
return 'jupyter_client.asynchronous.AsyncKernelClient'

_launch_kernel = KernelManager._async_launch_kernel
start_kernel = KernelManager._async_start_kernel
Expand All @@ -654,14 +630,34 @@ class AsyncKernelManager(KernelManager):
is_alive = KernelManager._async_is_alive


KernelManagerABC.register(KernelManager)
KernelManagerABC.register(AsyncKernelManager)


class BlockingKernelManager(KernelManager):
@default('client_class')
def _default_client_class(self):
return 'jupyter_client.blocking.BlockingKernelClient'

is_alive = run_sync(KernelManager._async_is_alive)
signal_kernel = run_sync(KernelManager._async_signal_kernel)
interrupt_kernel = run_sync(KernelManager._async_interrupt_kernel)
_kill_kernel = run_sync(KernelManager._async_kill_kernel)
_send_kernel_sigterm = run_sync(KernelManager._async_send_kernel_sigterm)
restart_kernel = run_sync(KernelManager._async_restart_kernel)
shutdown_kernel = run_sync(KernelManager._async_shutdown_kernel)
finish_shutdown = run_sync(KernelManager._async_finish_shutdown)
_launch_kernel = run_sync(KernelManager._async_launch_kernel)
start_kernel = run_sync(KernelManager._async_start_kernel)


KernelManagerABC.register(BlockingKernelManager)


def start_new_kernel(
startup_timeout: float = 60, kernel_name: str = "python", **kwargs
) -> t.Tuple[KernelManager, KernelClient]:
) -> t.Tuple[BlockingKernelManager, KernelClient]:
"""Start a new kernel, and return its Manager and Client"""
km = KernelManager(kernel_name=kernel_name)
km = BlockingKernelManager(kernel_name=kernel_name)
km.start_kernel(**kwargs)
kc = km.client()
kc.start_channels()
Expand Down
40 changes: 21 additions & 19 deletions jupyter_client/multikernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def wrapped(self, kernel_id: str, *args, **kwargs) -> t.Union[t.Callable, t.Awai
return wrapped


class MultiKernelManager(LoggingConfigurable):
class _MultiKernelManager(LoggingConfigurable):
"""A class for managing multiple kernels."""

default_kernel_name = Unicode(
Expand All @@ -58,7 +58,6 @@ class MultiKernelManager(LoggingConfigurable):
kernel_spec_manager = Instance(KernelSpecManager, allow_none=True)

kernel_manager_class = DottedObjectName(
"jupyter_client.ioloop.IOLoopKernelManager",
config=True,
help="""The kernel manager class. This is configurable to allow
subclassing of the KernelManager for customized behavior.
Expand Down Expand Up @@ -218,8 +217,6 @@ async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwarg
del self._starting_kernels[kernel_id]
return kernel_id

start_kernel = run_sync(_async_start_kernel)

async def _async_shutdown_kernel(
self,
kernel_id: str,
Expand Down Expand Up @@ -256,8 +253,6 @@ async def _async_shutdown_kernel(
for port in ports:
self.currently_used_ports.remove(port)

shutdown_kernel = run_sync(_async_shutdown_kernel)

@kernel_method
def request_shutdown(self, kernel_id: str, restart: t.Optional[bool] = False) -> None:
"""Ask a kernel to shut down by its kernel uuid"""
Expand Down Expand Up @@ -300,8 +295,6 @@ async def _async_shutdown_all(self, now: bool = False) -> None:
]
await asyncio.gather(*futs)

shutdown_all = run_sync(_async_shutdown_all)

@kernel_method
def interrupt_kernel(self, kernel_id: str) -> None:
"""Interrupt (SIGINT) the kernel by its uuid.
Expand Down Expand Up @@ -487,16 +480,25 @@ def new_kernel_id(self, **kwargs) -> str:
return str(uuid.uuid4())


class AsyncMultiKernelManager(MultiKernelManager):
class AsyncMultiKernelManager(_MultiKernelManager):
@default('kernel_manager_class')
def _default_kernel_manager_class(self):
return 'jupyter_client.ioloop.AsyncIOLoopKernelManager'

start_kernel = _MultiKernelManager._async_start_kernel
shutdown_kernel = _MultiKernelManager._async_shutdown_kernel
shutdown_all = _MultiKernelManager._async_shutdown_all


class BlockingMultiKernelManager(_MultiKernelManager):
@default('kernel_manager_class')
def _default_kernel_manager_class(self):
return 'jupyter_client.ioloop.IOLoopKernelManager'

start_kernel = run_sync(_MultiKernelManager._async_start_kernel)
shutdown_kernel = run_sync(_MultiKernelManager._async_shutdown_kernel)
shutdown_all = run_sync(_MultiKernelManager._async_shutdown_all)

kernel_manager_class = DottedObjectName(
"jupyter_client.ioloop.AsyncIOLoopKernelManager",
config=True,
help="""The kernel manager class. This is configurable to allow
subclassing of the AsyncKernelManager for customized behavior.
""",
)

start_kernel = MultiKernelManager._async_start_kernel
shutdown_kernel = MultiKernelManager._async_shutdown_kernel
shutdown_all = MultiKernelManager._async_shutdown_all
# Backward compatibility
MultiKernelManager = BlockingMultiKernelManager
16 changes: 8 additions & 8 deletions jupyter_client/tests/test_kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
from ..manager import start_new_async_kernel
from ..manager import start_new_kernel
from .utils import AsyncKMSubclass
from .utils import SyncKMSubclass
from .utils import BlockingKMSubclass
from .utils import test_env
from jupyter_client import AsyncKernelManager
from jupyter_client import KernelManager
from jupyter_client import BlockingKernelManager

pjoin = os.path.join

Expand Down Expand Up @@ -101,13 +101,13 @@ def start_kernel():

@pytest.fixture
def km(config):
km = KernelManager(config=config)
km = BlockingKernelManager(config=config)
return km


@pytest.fixture
def km_subclass(config):
km = SyncKMSubclass(config=config)
km = BlockingKMSubclass(config=config)
return km


Expand Down Expand Up @@ -200,7 +200,7 @@ def test_lifecycle(self, km):
km.restart_kernel(now=True)
assert km.is_alive()
km.interrupt_kernel()
assert isinstance(km, KernelManager)
assert isinstance(km, BlockingKernelManager)
km.shutdown_kernel(now=True)
assert km.context.closed

Expand Down Expand Up @@ -297,7 +297,7 @@ def test_cleanup_context(self, km):

def test_no_cleanup_shared_context(self, zmq_context):
"""kernel manager does not terminate shared context"""
km = KernelManager(context=zmq_context)
km = BlockingKernelManager(context=zmq_context)
assert km.context == zmq_context
assert km.context is not None

Expand Down Expand Up @@ -330,7 +330,7 @@ def test_subclass_callables(self, km_subclass):
km_subclass.interrupt_kernel()
assert km_subclass.call_count("interrupt_kernel") == 1

assert isinstance(km_subclass, KernelManager)
assert isinstance(km_subclass, BlockingKernelManager)

km_subclass.reset_counts()
km_subclass.shutdown_kernel(now=False)
Expand Down Expand Up @@ -408,7 +408,7 @@ def _prepare_kernel(self, km, startup_timeout=TIMEOUT, **kwargs):
return kc

def _run_signaltest_lifecycle(self, config=None):
km = KernelManager(config=config, kernel_name="signaltest")
km = BlockingKernelManager(config=config, kernel_name="signaltest")
kc = self._prepare_kernel(km, stdout=PIPE, stderr=PIPE)

def execute(cmd):
Expand Down
Loading