From 5babd84157d80097d8a366e8d0e80a4a0200bd8b Mon Sep 17 00:00:00 2001 From: David Brochart Date: Tue, 10 Mar 2020 18:04:15 +0100 Subject: [PATCH 01/12] Make nest_asyncio optional --- binder/run_nbclient.ipynb | 2 +- nbclient/client.py | 102 +++++++++++++------------------------- 2 files changed, 36 insertions(+), 68 deletions(-) diff --git a/binder/run_nbclient.ipynb b/binder/run_nbclient.ipynb index d8994b48..9f6f0584 100644 --- a/binder/run_nbclient.ipynb +++ b/binder/run_nbclient.ipynb @@ -46,7 +46,7 @@ "nb = nbf.read('./empty_notebook.ipynb', nbf.NO_CONVERT)\n", "\n", "# Execute our in-memory notebook, which will now have outputs\n", - "nb = nbclient.execute(nb)" + "nb = nbclient.execute(nb, nest_asyncio=True)" ] }, { diff --git a/nbclient/client.py b/nbclient/client.py index d196a339..4d747455 100644 --- a/nbclient/client.py +++ b/nbclient/client.py @@ -6,8 +6,6 @@ # contextlib, and we `await yield_()` instead of just `yield` from async_generator import asynccontextmanager, async_generator, yield_ -import nest_asyncio - from time import monotonic from queue import Empty import asyncio @@ -97,6 +95,21 @@ class NotebookClient(LoggingConfigurable): ), ).tag(config=True) + nest_asyncio = Bool( + False, + help=dedent( + """ + If False (default), then blocking functions such as `execute` + assume that no event loop is already running. These functions + run their async counterparts (e.g. `async_execute`) in an event + loop with `asyncio.run_until_complete`, which will fail if an + event loop is already running. This can be the case if nbclient + is used e.g. in a Jupyter Notebook. In that case, `nest_asyncio` + should be set to True. + """ + ), + ).tag(config=True) + force_raise_errors = Bool( False, help=dedent( @@ -367,21 +380,9 @@ async def setup_kernel(self, **kwargs): self.kc.stop_channels() self.kc = None - def execute(self, **kwargs): - """ - Executes each code cell (blocking). - - Returns - ------- - nb : NotebookNode - The executed notebook. - """ - loop = get_loop() - return loop.run_until_complete(self.async_execute(**kwargs)) - async def async_execute(self, **kwargs): """ - Executes each code cell asynchronously. + Executes each code cell. Returns ------- @@ -550,48 +551,9 @@ def _check_raise_for_error(self, cell, exec_reply): if (exec_reply is not None) and exec_reply['content']['status'] == 'error': raise CellExecutionError.from_cell_and_msg(cell, exec_reply['content']) - def execute_cell(self, cell, cell_index, execution_count=None, store_history=True): - """ - Executes a single code cell (blocking). - - To execute all cells see :meth:`execute`. - - Parameters - ---------- - cell : nbformat.NotebookNode - The cell which is currently being processed. - cell_index : int - The position of the cell within the notebook object. - execution_count : int - The execution count to be assigned to the cell (default: Use kernel response) - store_history : bool - Determines if history should be stored in the kernel (default: False). - Specific to ipython kernels, which can store command histories. - - Returns - ------- - output : dict - The execution output payload (or None for no output). - - Raises - ------ - CellExecutionError - If execution failed and should raise an exception, this will be raised - with defaults about the failure. - - Returns - ------- - cell : NotebookNode - The cell which was just processed. - """ - loop = get_loop() - return loop.run_until_complete( - self.async_execute_cell(cell, cell_index, execution_count, store_history) - ) - async def async_execute_cell(self, cell, cell_index, execution_count=None, store_history=True): """ - Executes a single code cell asynchronously. + Executes a single code cell. To execute all cells see :meth:`execute`. @@ -788,6 +750,24 @@ def _get_buffer_data(self, msg): return encoded_buffers +def make_blocking(async_method): + def blocking_method(self, *args, **kwargs): + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + if self.nest_asyncio: + import nest_asyncio + nest_asyncio.apply(loop) + return loop.run_until_complete(async_method(self, *args, **kwargs)) + return blocking_method + + +NotebookClient.execute = make_blocking(NotebookClient.async_execute) +NotebookClient.execute_cell = make_blocking(NotebookClient.async_execute_cell) + + def execute(nb, cwd=None, km=None, **kwargs): """Execute a notebook's code, updating outputs within the notebook object. @@ -809,15 +789,3 @@ def execute(nb, cwd=None, km=None, **kwargs): if cwd is not None: resources['metadata'] = {'path': cwd} return NotebookClient(nb=nb, resources=resources, km=km, **kwargs).execute() - - -def get_loop(): - try: - loop = asyncio.get_event_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - return loop - - -nest_asyncio.apply() From 06f8ba2b45dda8cc2ee26cb869c8738df1170436 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Thu, 12 Mar 2020 12:57:21 +0100 Subject: [PATCH 02/12] Explicitly declare execute and execute_cell in NotebookClient class --- nbclient/client.py | 54 ++++++++++++++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 18 deletions(-) diff --git a/nbclient/client.py b/nbclient/client.py index 4d747455..2261c9f6 100644 --- a/nbclient/client.py +++ b/nbclient/client.py @@ -294,6 +294,34 @@ def __init__(self, nb, km=None, **kw): self.km = km self.reset_execution_trackers() + def run_blocking(self, coro): + """Runs a coroutine and blocks until it has executed. + + An event loop is created if no one already exists. If an event loop is + already running, this event loop execution is nested into the already + running one if `nest_asyncio` is set to True. + + Parameters + ---------- + coro : coroutine + The coroutine to be executed. + + Returns + ------- + result : + Whatever the coroutine returns. + """ + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + if self.nest_asyncio: + import nest_asyncio + nest_asyncio.apply(loop) + result = loop.run_until_complete(coro) + return result + def reset_execution_trackers(self): """Resets any per-execution trackers. """ @@ -380,6 +408,9 @@ async def setup_kernel(self, **kwargs): self.kc.stop_channels() self.kc = None + def execute(self, **kwargs): + return self.run_blocking(self.async_execute(**kwargs)) + async def async_execute(self, **kwargs): """ Executes each code cell. @@ -551,6 +582,11 @@ def _check_raise_for_error(self, cell, exec_reply): if (exec_reply is not None) and exec_reply['content']['status'] == 'error': raise CellExecutionError.from_cell_and_msg(cell, exec_reply['content']) + def execute_cell(self, cell, cell_index, execution_count=None, store_history=True): + return self.run_blocking( + self.async_execute_cell(cell, cell_index, execution_count, store_history) + ) + async def async_execute_cell(self, cell, cell_index, execution_count=None, store_history=True): """ Executes a single code cell. @@ -750,24 +786,6 @@ def _get_buffer_data(self, msg): return encoded_buffers -def make_blocking(async_method): - def blocking_method(self, *args, **kwargs): - try: - loop = asyncio.get_event_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - if self.nest_asyncio: - import nest_asyncio - nest_asyncio.apply(loop) - return loop.run_until_complete(async_method(self, *args, **kwargs)) - return blocking_method - - -NotebookClient.execute = make_blocking(NotebookClient.async_execute) -NotebookClient.execute_cell = make_blocking(NotebookClient.async_execute_cell) - - def execute(nb, cwd=None, km=None, **kwargs): """Execute a notebook's code, updating outputs within the notebook object. From 7bddf0a0eccd15b42da118b196e85f4060982f59 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Thu, 12 Mar 2020 16:22:24 +0100 Subject: [PATCH 03/12] Show meaningful message when nest_asyncio=True is needed --- nbclient/client.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/nbclient/client.py b/nbclient/client.py index 2261c9f6..a84573cc 100644 --- a/nbclient/client.py +++ b/nbclient/client.py @@ -319,7 +319,16 @@ def run_blocking(self, coro): if self.nest_asyncio: import nest_asyncio nest_asyncio.apply(loop) - result = loop.run_until_complete(coro) + try: + result = loop.run_until_complete(coro) + except RuntimeError as e: + if str(e) == 'This event loop is already running': + raise RuntimeError( + 'You are trying to run nbclient in an environment where an ' + 'event loop is already running. Please pass `nest_asyncio=True` in ' + '`NotebookClient.execute` and such methods.' + ) + raise return result def reset_execution_trackers(self): From 811ac3e4998e1c6e569477138c17baa75dcecb58 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Fri, 13 Mar 2020 12:10:30 +0100 Subject: [PATCH 04/12] Use AsyncKernelManager from jupyter_client --- nbclient/client.py | 30 ++++++++++++++-------------- nbclient/tests/fake_kernelmanager.py | 8 ++++---- nbclient/tests/test_client.py | 11 +++++++--- requirements.txt | 2 +- 4 files changed, 28 insertions(+), 23 deletions(-) diff --git a/nbclient/client.py b/nbclient/client.py index d196a339..11551c50 100644 --- a/nbclient/client.py +++ b/nbclient/client.py @@ -218,9 +218,9 @@ class NotebookClient(LoggingConfigurable): @default('kernel_manager_class') def _kernel_manager_class_default(self): """Use a dynamic default to avoid importing jupyter_client at startup""" - from jupyter_client import KernelManager + from jupyter_client import AsyncKernelManager - return KernelManager + return AsyncKernelManager _display_id_map = Dict( help=dedent( @@ -317,8 +317,8 @@ async def start_new_kernel_client(self, **kwargs): ---------- kwargs : Any options for `self.kernel_manager_class.start_kernel()`. Because - that defaults to KernelManager, this will likely include options - accepted by `KernelManager.start_kernel()``, which includes `cwd`. + that defaults to AsyncKernelManager, this will likely include options + accepted by `AsyncKernelManager.start_kernel()``, which includes `cwd`. Returns ------- @@ -332,7 +332,7 @@ async def start_new_kernel_client(self, **kwargs): if self.km.ipykernel and self.ipython_hist_file: self.extra_arguments += ['--HistoryManager.hist_file={}'.format(self.ipython_hist_file)] - self.km.start_kernel(extra_arguments=self.extra_arguments, **kwargs) + await self.km.start_kernel(extra_arguments=self.extra_arguments, **kwargs) self.kc = self.km.client() self.kc.start_channels() @@ -340,7 +340,7 @@ async def start_new_kernel_client(self, **kwargs): await self.kc.wait_for_ready(timeout=self.startup_timeout) except RuntimeError: self.kc.stop_channels() - self.km.shutdown_kernel() + await self.km.shutdown_kernel() raise self.kc.allow_stdin = False return self.kc @@ -470,8 +470,8 @@ async def _poll_for_reply(self, msg_id, cell, timeout, task_poll_output_msg): timeout = max(0, deadline - monotonic()) except Empty: # received no message, check if kernel is still alive - self._check_alive() - self._handle_timeout(timeout, cell) + await self._check_alive() + await self._handle_timeout(timeout, cell) async def _poll_output_msg(self, parent_msg_id, cell, cell_index): while True: @@ -494,18 +494,18 @@ def _get_timeout(self, cell): return timeout - def _handle_timeout(self, timeout, cell=None): + async def _handle_timeout(self, timeout, cell=None): self.log.error("Timeout waiting for execute reply (%is)." % timeout) if self.interrupt_on_timeout: self.log.error("Interrupting kernel") - self.km.interrupt_kernel() + await self.km.interrupt_kernel() else: raise CellTimeoutError.error_from_timeout_and_cell( "Cell execution timed out", timeout, cell ) - def _check_alive(self): - if not self.kc.is_alive(): + async def _check_alive(self): + if not await self.kc.is_alive(): self.log.error("Kernel died while waiting for execute reply.") raise DeadKernelError("Kernel died") @@ -518,10 +518,10 @@ async def _wait_for_reply(self, msg_id, cell=None): try: msg = await self.kc.shell_channel.get_msg(timeout=self.shell_timeout_interval) except Empty: - self._check_alive() + await self._check_alive() cummulative_time += self.shell_timeout_interval if timeout and cummulative_time > timeout: - self._handle_timeout(timeout, cell) + await self._handle_timeout(timeout, cell) break else: if msg['parent_header'].get('msg_id') == msg_id: @@ -800,7 +800,7 @@ def execute(nb, cwd=None, km=None, **kwargs): The notebook object to be executed cwd : str, optional If supplied, the kernel will run in this directory - km : KernelManager, optional + km : AsyncKernelManager, optional If supplied, the specified kernel manager will be used for code execution. kwargs : Any other options for ExecutePreprocessor, e.g. timeout, kernel_name diff --git a/nbclient/tests/fake_kernelmanager.py b/nbclient/tests/fake_kernelmanager.py index fb76a689..893f9176 100644 --- a/nbclient/tests/fake_kernelmanager.py +++ b/nbclient/tests/fake_kernelmanager.py @@ -1,7 +1,7 @@ -from jupyter_client.manager import KernelManager +from jupyter_client.manager import AsyncKernelManager -class FakeCustomKernelManager(KernelManager): +class FakeCustomKernelManager(AsyncKernelManager): expected_methods = {'__init__': 0, 'client': 0, 'start_kernel': 0} def __init__(self, *args, **kwargs): @@ -9,10 +9,10 @@ def __init__(self, *args, **kwargs): self.expected_methods['__init__'] += 1 super(FakeCustomKernelManager, self).__init__(*args, **kwargs) - def start_kernel(self, *args, **kwargs): + async def start_kernel(self, *args, **kwargs): self.log.info('FakeCustomKernelManager started a kernel') self.expected_methods['start_kernel'] += 1 - return super(FakeCustomKernelManager, self).start_kernel(*args, **kwargs) + return await super(FakeCustomKernelManager, self).start_kernel(*args, **kwargs) def client(self, *args, **kwargs): self.log.info('FakeCustomKernelManager created a client') diff --git a/nbclient/tests/test_client.py b/nbclient/tests/test_client.py index 988f8539..7cb1a7f6 100644 --- a/nbclient/tests/test_client.py +++ b/nbclient/tests/test_client.py @@ -144,7 +144,7 @@ def prepared_wrapper(func): def test_mock_wrapper(self): """ This inner function wrapper populates the executor object with - the fake kernel client. This client has it's iopub and shell + the fake kernel client. This client has its iopub and shell channels mocked so as to fake the setup handshake and return the messages passed into prepare_cell_mocks as the execute_cell loop processes them. @@ -161,6 +161,7 @@ def test_mock_wrapper(self): iopub_channel=MagicMock(get_msg=message_mock), shell_channel=MagicMock(get_msg=shell_channel_message_mock()), execute=MagicMock(return_value=parent_id), + is_alive=MagicMock(return_value=make_async(True)) ) executor.parent_id = parent_id return func(self, executor, cell_mock, message_mock) @@ -491,7 +492,7 @@ def test_kernel_death(self): km = executor.start_kernel_manager() with patch.object(km, "is_alive") as alive_mock: - alive_mock.return_value = False + alive_mock.return_value = make_async(False) # Will be a RuntimeError or subclass DeadKernelError depending # on if jupyter_client or nbconvert catches the dead client first with pytest.raises(RuntimeError): @@ -672,7 +673,11 @@ def test_busy_message(self, executor, cell_mock, message_mock): ) def test_deadline_exec_reply(self, executor, cell_mock, message_mock): # exec_reply is never received, so we expect to hit the timeout. - executor.kc.shell_channel.get_msg = MagicMock(side_effect=Empty()) + async def get_msg(timeout): + await asyncio.sleep(timeout) + raise Empty + + executor.kc.shell_channel.get_msg = get_msg executor.timeout = 1 with pytest.raises(TimeoutError): diff --git a/requirements.txt b/requirements.txt index 0a9e2ad2..6f129963 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ traitlets>=4.2 -jupyter_client>=6.0.0 +jupyter_client @ git+https://github.com/jupyter/jupyter_client@master nbformat>=5.0 async_generator nest_asyncio From 6663faeec18e0639016b2951c8cd84286ec66b6f Mon Sep 17 00:00:00 2001 From: David Brochart Date: Thu, 19 Mar 2020 11:03:38 +0100 Subject: [PATCH 05/12] Change run_blocking into run_sync --- nbclient/client.py | 58 ++++++----------------------------- nbclient/tests/test_client.py | 3 ++ nbclient/util.py | 47 ++++++++++++++++++++++++++++ requirements.txt | 2 +- 4 files changed, 60 insertions(+), 50 deletions(-) create mode 100644 nbclient/util.py diff --git a/nbclient/client.py b/nbclient/client.py index a84573cc..6f1f72e3 100644 --- a/nbclient/client.py +++ b/nbclient/client.py @@ -16,6 +16,7 @@ from nbformat.v4 import output_from_msg from .exceptions import CellTimeoutError, DeadKernelError, CellExecutionComplete, CellExecutionError +from .util import run_sync def timestamp(): @@ -294,43 +295,6 @@ def __init__(self, nb, km=None, **kw): self.km = km self.reset_execution_trackers() - def run_blocking(self, coro): - """Runs a coroutine and blocks until it has executed. - - An event loop is created if no one already exists. If an event loop is - already running, this event loop execution is nested into the already - running one if `nest_asyncio` is set to True. - - Parameters - ---------- - coro : coroutine - The coroutine to be executed. - - Returns - ------- - result : - Whatever the coroutine returns. - """ - try: - loop = asyncio.get_event_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - if self.nest_asyncio: - import nest_asyncio - nest_asyncio.apply(loop) - try: - result = loop.run_until_complete(coro) - except RuntimeError as e: - if str(e) == 'This event loop is already running': - raise RuntimeError( - 'You are trying to run nbclient in an environment where an ' - 'event loop is already running. Please pass `nest_asyncio=True` in ' - '`NotebookClient.execute` and such methods.' - ) - raise - return result - def reset_execution_trackers(self): """Resets any per-execution trackers. """ @@ -417,9 +381,6 @@ async def setup_kernel(self, **kwargs): self.kc.stop_channels() self.kc = None - def execute(self, **kwargs): - return self.run_blocking(self.async_execute(**kwargs)) - async def async_execute(self, **kwargs): """ Executes each code cell. @@ -445,6 +406,8 @@ async def async_execute(self, **kwargs): return self.nb + execute = run_sync(async_execute) + def set_widgets_metadata(self): if self.widget_state: self.nb.metadata.widgets = { @@ -511,7 +474,7 @@ async def _poll_for_reply(self, msg_id, cell, timeout, task_poll_output_msg): timeout = max(0, deadline - monotonic()) except Empty: # received no message, check if kernel is still alive - self._check_alive() + await self._check_alive() self._handle_timeout(timeout, cell) async def _poll_output_msg(self, parent_msg_id, cell, cell_index): @@ -545,8 +508,8 @@ def _handle_timeout(self, timeout, cell=None): "Cell execution timed out", timeout, cell ) - def _check_alive(self): - if not self.kc.is_alive(): + async def _check_alive(self): + if not await self.kc.is_alive(): self.log.error("Kernel died while waiting for execute reply.") raise DeadKernelError("Kernel died") @@ -559,7 +522,7 @@ async def _wait_for_reply(self, msg_id, cell=None): try: msg = await self.kc.shell_channel.get_msg(timeout=self.shell_timeout_interval) except Empty: - self._check_alive() + await self._check_alive() cummulative_time += self.shell_timeout_interval if timeout and cummulative_time > timeout: self._handle_timeout(timeout, cell) @@ -591,11 +554,6 @@ def _check_raise_for_error(self, cell, exec_reply): if (exec_reply is not None) and exec_reply['content']['status'] == 'error': raise CellExecutionError.from_cell_and_msg(cell, exec_reply['content']) - def execute_cell(self, cell, cell_index, execution_count=None, store_history=True): - return self.run_blocking( - self.async_execute_cell(cell, cell_index, execution_count, store_history) - ) - async def async_execute_cell(self, cell, cell_index, execution_count=None, store_history=True): """ Executes a single code cell. @@ -661,6 +619,8 @@ async def async_execute_cell(self, cell, cell_index, execution_count=None, store self.nb['cells'][cell_index] = cell return cell + execute_cell = run_sync(async_execute_cell) + def process_message(self, msg, cell, cell_index): """ Processes a kernel message, updates cell state, and returns the diff --git a/nbclient/tests/test_client.py b/nbclient/tests/test_client.py index 988f8539..d88f6ba1 100644 --- a/nbclient/tests/test_client.py +++ b/nbclient/tests/test_client.py @@ -672,6 +672,9 @@ def test_busy_message(self, executor, cell_mock, message_mock): ) def test_deadline_exec_reply(self, executor, cell_mock, message_mock): # exec_reply is never received, so we expect to hit the timeout. + async def is_alive(): + return True + executor.kc.is_alive = is_alive executor.kc.shell_channel.get_msg = MagicMock(side_effect=Empty()) executor.timeout = 1 diff --git a/nbclient/util.py b/nbclient/util.py new file mode 100644 index 00000000..1a274792 --- /dev/null +++ b/nbclient/util.py @@ -0,0 +1,47 @@ +"""General utility methods""" + +# Copyright (c) Jupyter Development Team. +# Distributed under the terms of the Modified BSD License. + +import asyncio + + +def run_sync(coro): + """Runs a coroutine and blocks until it has executed. + + An event loop is created if no one already exists. If an event loop is + already running, this event loop execution is nested into the already + running one if `nest_asyncio` is set to True. + + Parameters + ---------- + coro : coroutine + The coroutine to be executed. + + Returns + ------- + result : + Whatever the coroutine returns. + """ + def wrapped(self, *args, **kwargs): + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + if self.nest_asyncio: + import nest_asyncio + nest_asyncio.apply(loop) + try: + result = loop.run_until_complete(coro(self, *args, **kwargs)) + except RuntimeError as e: + if str(e) == 'This event loop is already running': + raise RuntimeError( + 'You are trying to run nbclient in an environment where an ' + 'event loop is already running. Please pass `nest_asyncio=True` in ' + '`NotebookClient.execute` and such methods.' + ) + raise + return result + wrapped.__doc__ = coro.__doc__ + return wrapped diff --git a/requirements.txt b/requirements.txt index 0a9e2ad2..8bc1c9f9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ traitlets>=4.2 -jupyter_client>=6.0.0 +jupyter_client @ git+https://github.com/jupyter/jupyter_client nbformat>=5.0 async_generator nest_asyncio From f1e79487f44ad48ce10bdbb89987b6d347b1a270 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Fri, 20 Mar 2020 08:58:13 +0100 Subject: [PATCH 06/12] Require jupyter_client>=6.1.0 --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 6f129963..19441cd4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ traitlets>=4.2 -jupyter_client @ git+https://github.com/jupyter/jupyter_client@master +jupyter_client>=6.1.0 nbformat>=5.0 async_generator nest_asyncio From a93cdee4024ca430b28de54442414069bc1f3b57 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Fri, 20 Mar 2020 09:01:24 +0100 Subject: [PATCH 07/12] Require jupyter_client>=6.1.0 --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 8bc1c9f9..19441cd4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ traitlets>=4.2 -jupyter_client @ git+https://github.com/jupyter/jupyter_client +jupyter_client>=6.1.0 nbformat>=5.0 async_generator nest_asyncio From 0e7d1509396ffe98c8c15843a3f58c39f48eba16 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Tue, 10 Mar 2020 18:04:15 +0100 Subject: [PATCH 08/12] Make nest_asyncio optional --- binder/run_nbclient.ipynb | 2 +- nbclient/client.py | 102 +++++++++++++------------------------- 2 files changed, 36 insertions(+), 68 deletions(-) diff --git a/binder/run_nbclient.ipynb b/binder/run_nbclient.ipynb index d8994b48..9f6f0584 100644 --- a/binder/run_nbclient.ipynb +++ b/binder/run_nbclient.ipynb @@ -46,7 +46,7 @@ "nb = nbf.read('./empty_notebook.ipynb', nbf.NO_CONVERT)\n", "\n", "# Execute our in-memory notebook, which will now have outputs\n", - "nb = nbclient.execute(nb)" + "nb = nbclient.execute(nb, nest_asyncio=True)" ] }, { diff --git a/nbclient/client.py b/nbclient/client.py index 11551c50..1c1dbef1 100644 --- a/nbclient/client.py +++ b/nbclient/client.py @@ -6,8 +6,6 @@ # contextlib, and we `await yield_()` instead of just `yield` from async_generator import asynccontextmanager, async_generator, yield_ -import nest_asyncio - from time import monotonic from queue import Empty import asyncio @@ -97,6 +95,21 @@ class NotebookClient(LoggingConfigurable): ), ).tag(config=True) + nest_asyncio = Bool( + False, + help=dedent( + """ + If False (default), then blocking functions such as `execute` + assume that no event loop is already running. These functions + run their async counterparts (e.g. `async_execute`) in an event + loop with `asyncio.run_until_complete`, which will fail if an + event loop is already running. This can be the case if nbclient + is used e.g. in a Jupyter Notebook. In that case, `nest_asyncio` + should be set to True. + """ + ), + ).tag(config=True) + force_raise_errors = Bool( False, help=dedent( @@ -367,21 +380,9 @@ async def setup_kernel(self, **kwargs): self.kc.stop_channels() self.kc = None - def execute(self, **kwargs): - """ - Executes each code cell (blocking). - - Returns - ------- - nb : NotebookNode - The executed notebook. - """ - loop = get_loop() - return loop.run_until_complete(self.async_execute(**kwargs)) - async def async_execute(self, **kwargs): """ - Executes each code cell asynchronously. + Executes each code cell. Returns ------- @@ -550,48 +551,9 @@ def _check_raise_for_error(self, cell, exec_reply): if (exec_reply is not None) and exec_reply['content']['status'] == 'error': raise CellExecutionError.from_cell_and_msg(cell, exec_reply['content']) - def execute_cell(self, cell, cell_index, execution_count=None, store_history=True): - """ - Executes a single code cell (blocking). - - To execute all cells see :meth:`execute`. - - Parameters - ---------- - cell : nbformat.NotebookNode - The cell which is currently being processed. - cell_index : int - The position of the cell within the notebook object. - execution_count : int - The execution count to be assigned to the cell (default: Use kernel response) - store_history : bool - Determines if history should be stored in the kernel (default: False). - Specific to ipython kernels, which can store command histories. - - Returns - ------- - output : dict - The execution output payload (or None for no output). - - Raises - ------ - CellExecutionError - If execution failed and should raise an exception, this will be raised - with defaults about the failure. - - Returns - ------- - cell : NotebookNode - The cell which was just processed. - """ - loop = get_loop() - return loop.run_until_complete( - self.async_execute_cell(cell, cell_index, execution_count, store_history) - ) - async def async_execute_cell(self, cell, cell_index, execution_count=None, store_history=True): """ - Executes a single code cell asynchronously. + Executes a single code cell. To execute all cells see :meth:`execute`. @@ -788,6 +750,24 @@ def _get_buffer_data(self, msg): return encoded_buffers +def make_blocking(async_method): + def blocking_method(self, *args, **kwargs): + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + if self.nest_asyncio: + import nest_asyncio + nest_asyncio.apply(loop) + return loop.run_until_complete(async_method(self, *args, **kwargs)) + return blocking_method + + +NotebookClient.execute = make_blocking(NotebookClient.async_execute) +NotebookClient.execute_cell = make_blocking(NotebookClient.async_execute_cell) + + def execute(nb, cwd=None, km=None, **kwargs): """Execute a notebook's code, updating outputs within the notebook object. @@ -809,15 +789,3 @@ def execute(nb, cwd=None, km=None, **kwargs): if cwd is not None: resources['metadata'] = {'path': cwd} return NotebookClient(nb=nb, resources=resources, km=km, **kwargs).execute() - - -def get_loop(): - try: - loop = asyncio.get_event_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - return loop - - -nest_asyncio.apply() From 75642b2e50178973065521daf7362cf9c60e892b Mon Sep 17 00:00:00 2001 From: David Brochart Date: Thu, 12 Mar 2020 12:57:21 +0100 Subject: [PATCH 09/12] Explicitly declare execute and execute_cell in NotebookClient class --- nbclient/client.py | 54 ++++++++++++++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 18 deletions(-) diff --git a/nbclient/client.py b/nbclient/client.py index 1c1dbef1..67072599 100644 --- a/nbclient/client.py +++ b/nbclient/client.py @@ -294,6 +294,34 @@ def __init__(self, nb, km=None, **kw): self.km = km self.reset_execution_trackers() + def run_blocking(self, coro): + """Runs a coroutine and blocks until it has executed. + + An event loop is created if no one already exists. If an event loop is + already running, this event loop execution is nested into the already + running one if `nest_asyncio` is set to True. + + Parameters + ---------- + coro : coroutine + The coroutine to be executed. + + Returns + ------- + result : + Whatever the coroutine returns. + """ + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + if self.nest_asyncio: + import nest_asyncio + nest_asyncio.apply(loop) + result = loop.run_until_complete(coro) + return result + def reset_execution_trackers(self): """Resets any per-execution trackers. """ @@ -380,6 +408,9 @@ async def setup_kernel(self, **kwargs): self.kc.stop_channels() self.kc = None + def execute(self, **kwargs): + return self.run_blocking(self.async_execute(**kwargs)) + async def async_execute(self, **kwargs): """ Executes each code cell. @@ -551,6 +582,11 @@ def _check_raise_for_error(self, cell, exec_reply): if (exec_reply is not None) and exec_reply['content']['status'] == 'error': raise CellExecutionError.from_cell_and_msg(cell, exec_reply['content']) + def execute_cell(self, cell, cell_index, execution_count=None, store_history=True): + return self.run_blocking( + self.async_execute_cell(cell, cell_index, execution_count, store_history) + ) + async def async_execute_cell(self, cell, cell_index, execution_count=None, store_history=True): """ Executes a single code cell. @@ -750,24 +786,6 @@ def _get_buffer_data(self, msg): return encoded_buffers -def make_blocking(async_method): - def blocking_method(self, *args, **kwargs): - try: - loop = asyncio.get_event_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - if self.nest_asyncio: - import nest_asyncio - nest_asyncio.apply(loop) - return loop.run_until_complete(async_method(self, *args, **kwargs)) - return blocking_method - - -NotebookClient.execute = make_blocking(NotebookClient.async_execute) -NotebookClient.execute_cell = make_blocking(NotebookClient.async_execute_cell) - - def execute(nb, cwd=None, km=None, **kwargs): """Execute a notebook's code, updating outputs within the notebook object. From f2c2722037ddc1aba0309bf2f6a1e9d55100050f Mon Sep 17 00:00:00 2001 From: David Brochart Date: Thu, 12 Mar 2020 16:22:24 +0100 Subject: [PATCH 10/12] Show meaningful message when nest_asyncio=True is needed --- nbclient/client.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/nbclient/client.py b/nbclient/client.py index 67072599..e5fda1ab 100644 --- a/nbclient/client.py +++ b/nbclient/client.py @@ -319,7 +319,16 @@ def run_blocking(self, coro): if self.nest_asyncio: import nest_asyncio nest_asyncio.apply(loop) - result = loop.run_until_complete(coro) + try: + result = loop.run_until_complete(coro) + except RuntimeError as e: + if str(e) == 'This event loop is already running': + raise RuntimeError( + 'You are trying to run nbclient in an environment where an ' + 'event loop is already running. Please pass `nest_asyncio=True` in ' + '`NotebookClient.execute` and such methods.' + ) + raise return result def reset_execution_trackers(self): From 684064d8a286f3237f84f3fd79b64cde6e0dee75 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Thu, 19 Mar 2020 11:03:38 +0100 Subject: [PATCH 11/12] Change run_blocking into run_sync --- nbclient/client.py | 50 ++++------------------------------- nbclient/tests/test_client.py | 2 +- nbclient/util.py | 47 ++++++++++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 46 deletions(-) create mode 100644 nbclient/util.py diff --git a/nbclient/client.py b/nbclient/client.py index e5fda1ab..87489cd5 100644 --- a/nbclient/client.py +++ b/nbclient/client.py @@ -16,6 +16,7 @@ from nbformat.v4 import output_from_msg from .exceptions import CellTimeoutError, DeadKernelError, CellExecutionComplete, CellExecutionError +from .util import run_sync def timestamp(): @@ -294,43 +295,6 @@ def __init__(self, nb, km=None, **kw): self.km = km self.reset_execution_trackers() - def run_blocking(self, coro): - """Runs a coroutine and blocks until it has executed. - - An event loop is created if no one already exists. If an event loop is - already running, this event loop execution is nested into the already - running one if `nest_asyncio` is set to True. - - Parameters - ---------- - coro : coroutine - The coroutine to be executed. - - Returns - ------- - result : - Whatever the coroutine returns. - """ - try: - loop = asyncio.get_event_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - if self.nest_asyncio: - import nest_asyncio - nest_asyncio.apply(loop) - try: - result = loop.run_until_complete(coro) - except RuntimeError as e: - if str(e) == 'This event loop is already running': - raise RuntimeError( - 'You are trying to run nbclient in an environment where an ' - 'event loop is already running. Please pass `nest_asyncio=True` in ' - '`NotebookClient.execute` and such methods.' - ) - raise - return result - def reset_execution_trackers(self): """Resets any per-execution trackers. """ @@ -417,9 +381,6 @@ async def setup_kernel(self, **kwargs): self.kc.stop_channels() self.kc = None - def execute(self, **kwargs): - return self.run_blocking(self.async_execute(**kwargs)) - async def async_execute(self, **kwargs): """ Executes each code cell. @@ -445,6 +406,8 @@ async def async_execute(self, **kwargs): return self.nb + execute = run_sync(async_execute) + def set_widgets_metadata(self): if self.widget_state: self.nb.metadata.widgets = { @@ -591,11 +554,6 @@ def _check_raise_for_error(self, cell, exec_reply): if (exec_reply is not None) and exec_reply['content']['status'] == 'error': raise CellExecutionError.from_cell_and_msg(cell, exec_reply['content']) - def execute_cell(self, cell, cell_index, execution_count=None, store_history=True): - return self.run_blocking( - self.async_execute_cell(cell, cell_index, execution_count, store_history) - ) - async def async_execute_cell(self, cell, cell_index, execution_count=None, store_history=True): """ Executes a single code cell. @@ -661,6 +619,8 @@ async def async_execute_cell(self, cell, cell_index, execution_count=None, store self.nb['cells'][cell_index] = cell return cell + execute_cell = run_sync(async_execute_cell) + def process_message(self, msg, cell, cell_index): """ Processes a kernel message, updates cell state, and returns the diff --git a/nbclient/tests/test_client.py b/nbclient/tests/test_client.py index 7cb1a7f6..f8fb10b2 100644 --- a/nbclient/tests/test_client.py +++ b/nbclient/tests/test_client.py @@ -492,7 +492,7 @@ def test_kernel_death(self): km = executor.start_kernel_manager() with patch.object(km, "is_alive") as alive_mock: - alive_mock.return_value = make_async(False) + alive_mock.return_value = False # Will be a RuntimeError or subclass DeadKernelError depending # on if jupyter_client or nbconvert catches the dead client first with pytest.raises(RuntimeError): diff --git a/nbclient/util.py b/nbclient/util.py new file mode 100644 index 00000000..1a274792 --- /dev/null +++ b/nbclient/util.py @@ -0,0 +1,47 @@ +"""General utility methods""" + +# Copyright (c) Jupyter Development Team. +# Distributed under the terms of the Modified BSD License. + +import asyncio + + +def run_sync(coro): + """Runs a coroutine and blocks until it has executed. + + An event loop is created if no one already exists. If an event loop is + already running, this event loop execution is nested into the already + running one if `nest_asyncio` is set to True. + + Parameters + ---------- + coro : coroutine + The coroutine to be executed. + + Returns + ------- + result : + Whatever the coroutine returns. + """ + def wrapped(self, *args, **kwargs): + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + if self.nest_asyncio: + import nest_asyncio + nest_asyncio.apply(loop) + try: + result = loop.run_until_complete(coro(self, *args, **kwargs)) + except RuntimeError as e: + if str(e) == 'This event loop is already running': + raise RuntimeError( + 'You are trying to run nbclient in an environment where an ' + 'event loop is already running. Please pass `nest_asyncio=True` in ' + '`NotebookClient.execute` and such methods.' + ) + raise + return result + wrapped.__doc__ = coro.__doc__ + return wrapped From 0c61be435a4fe80b3795f4321877e29e7c80bf03 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Wed, 25 Mar 2020 18:08:21 +0100 Subject: [PATCH 12/12] is_alive is async --- nbclient/tests/test_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nbclient/tests/test_client.py b/nbclient/tests/test_client.py index f8fb10b2..7cb1a7f6 100644 --- a/nbclient/tests/test_client.py +++ b/nbclient/tests/test_client.py @@ -492,7 +492,7 @@ def test_kernel_death(self): km = executor.start_kernel_manager() with patch.object(km, "is_alive") as alive_mock: - alive_mock.return_value = False + alive_mock.return_value = make_async(False) # Will be a RuntimeError or subclass DeadKernelError depending # on if jupyter_client or nbconvert catches the dead client first with pytest.raises(RuntimeError):