Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove nest-asyncio dependency #835

Merged
merged 53 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
15988e2
remove nest-asyncio dependency
blink1073 Sep 14, 2022
d871bf1
fix typing
blink1073 Sep 14, 2022
6fe99ae
use a task runner
blink1073 Sep 14, 2022
d376582
try to fix threaded channel
blink1073 Sep 14, 2022
dadbd68
fix channels
blink1073 Sep 14, 2022
b69c855
fix client
blink1073 Sep 14, 2022
016b22d
attempt to fix ipykernel
blink1073 Sep 14, 2022
c2b4e48
remove hack
blink1073 Sep 14, 2022
00ac661
more cleanup and debug
blink1073 Sep 14, 2022
57f5fd8
fix restarter tests
blink1073 Sep 14, 2022
5e8f31b
more fixes
blink1073 Sep 15, 2022
75038df
fix session
blink1073 Sep 15, 2022
b5d3eb0
make function async
blink1073 Sep 16, 2022
9d2c0ff
clean up
blink1073 Sep 16, 2022
f99b196
try without timing out tests
blink1073 Sep 16, 2022
14b11c1
skip all parallel process tests
blink1073 Sep 16, 2022
f4c1c9a
skip another one
blink1073 Sep 16, 2022
059ce9b
see if that fixes tests
blink1073 Sep 16, 2022
5ec0407
refactor
blink1073 Sep 17, 2022
3e757c7
more cleanup
blink1073 Sep 17, 2022
44aca67
clean up session tests
blink1073 Sep 17, 2022
af387d2
add async session
blink1073 Sep 18, 2022
ec2ab21
add tests for async client
blink1073 Sep 18, 2022
99a800b
use main thread where possible
blink1073 Sep 18, 2022
0986cfe
make session and channels fully sync
blink1073 Sep 18, 2022
510b77a
more progress
blink1073 Sep 18, 2022
f33c02c
fix more
blink1073 Sep 19, 2022
ad1d52a
fix another test
blink1073 Sep 19, 2022
8c273fe
fix threaded client
blink1073 Sep 19, 2022
c01e7bf
uncomment test
blink1073 Sep 19, 2022
d3a21c4
Merge branch 'synchronous_managers' of github.com:blink1073/jupyter_c…
blink1073 Sep 19, 2022
ab93126
fix threaded tests
blink1073 Sep 19, 2022
cdc1839
lint
blink1073 Sep 19, 2022
68ee9d8
increase timeout and ignore resource warnings on Windows
blink1073 Sep 20, 2022
157d68a
fix warning filter
blink1073 Sep 20, 2022
b12f9e6
cleanup
blink1073 Sep 20, 2022
478c954
cleanup
blink1073 Sep 20, 2022
4a4353d
fix handling of async zmq streams
blink1073 Sep 20, 2022
31eda17
better handling of futures
blink1073 Sep 20, 2022
7325663
use futures instead of await
blink1073 Sep 20, 2022
a4c38e4
lint and fix for execute_interactive
blink1073 Sep 20, 2022
30a3fac
do not require self arg in run_sync
blink1073 Sep 21, 2022
a338e3a
debug timeout failures
blink1073 Sep 21, 2022
2d1cf6a
Update jupyter_client/ioloop/manager.py
blink1073 Sep 22, 2022
c6ed212
better handling of zmq sockets
blink1073 Sep 22, 2022
fe677c4
lint
blink1073 Sep 22, 2022
449ac8a
Update jupyter_client/ioloop/manager.py
blink1073 Sep 23, 2022
d408d25
close original socket
blink1073 Sep 23, 2022
ebf5527
skip failing test on ubuntu
blink1073 Sep 25, 2022
dfbcd7e
remove unnecessary run_sync
blink1073 Sep 25, 2022
293e159
avoid creating throwaway socket
blink1073 Sep 27, 2022
80ef4c3
Merge branch 'synchronous_managers' of github.com:blink1073/jupyter_c…
blink1073 Sep 27, 2022
097fa1e
use asyncio.Future
blink1073 Sep 29, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/downstream.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ concurrency:
jobs:
ipykernel:
runs-on: ubuntu-latest
timeout-minutes: 10
timeout-minutes: 15
steps:
- uses: actions/checkout@v2
- uses: jupyterlab/maintainer-tools/.github/actions/base-setup@v1
Expand All @@ -23,7 +23,7 @@ jobs:

nbclient:
runs-on: ubuntu-latest
timeout-minutes: 10
timeout-minutes: 15
steps:
- uses: actions/checkout@v2
- uses: jupyterlab/maintainer-tools/.github/actions/base-setup@v1
Expand All @@ -34,7 +34,7 @@ jobs:

nbconvert:
runs-on: ubuntu-latest
timeout-minutes: 10
timeout-minutes: 15
steps:
- uses: actions/checkout@v2
- uses: jupyterlab/maintainer-tools/.github/actions/base-setup@v1
Expand All @@ -45,7 +45,7 @@ jobs:

jupyter_server:
runs-on: ubuntu-latest
timeout-minutes: 10
timeout-minutes: 15
steps:
- uses: actions/checkout@v2
- uses: jupyterlab/maintainer-tools/.github/actions/base-setup@v1
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ jobs:
- name: Run the tests on pypy and windows
if: ${{ startsWith(matrix.python-version, 'pypy') || startsWith(matrix.os, 'windows') }}
run: |
python -m pytest -vv || python -m pytest -vv --lf
# Ignore warnings on Windows and PyPI
python -m pytest -vv -W ignore || python -m pytest -vv -W ignore --lf

- name: Code coverage
run: codecov
Expand Down
27 changes: 20 additions & 7 deletions jupyter_client/asynchronous/client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
"""Implements an async kernel client"""
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
import asyncio

import zmq.asyncio
from traitlets import Instance
from traitlets import Type

from jupyter_client.channels import AsyncZMQSocketChannel
from jupyter_client.channels import HBChannel
from jupyter_client.channels import ZMQSocketChannel
from jupyter_client.client import KernelClient
from jupyter_client.client import reqrep

Expand All @@ -14,9 +18,11 @@ def _(self, *args, **kwargs):
reply = kwargs.pop("reply", False)
timeout = kwargs.pop("timeout", None)
msg_id = meth(self, *args, **kwargs)
fut: asyncio.Future = asyncio.Future()
fut.set_result(msg_id)
if not reply:
return msg_id
return self._async_recv_reply(msg_id, timeout=timeout, channel=channel)
return fut
return self._recv_reply(msg_id, timeout=timeout, channel=channel)

return _

Expand All @@ -28,6 +34,12 @@ class AsyncKernelClient(KernelClient):
raising :exc:`queue.Empty` if no message arrives within ``timeout`` seconds.
"""

context = Instance(zmq.asyncio.Context)

def _context_default(self) -> zmq.asyncio.Context:
self._created_context = True
return zmq.asyncio.Context()

# --------------------------------------------------------------------------
# Channel proxy methods
# --------------------------------------------------------------------------
Expand All @@ -40,18 +52,19 @@ class AsyncKernelClient(KernelClient):
wait_for_ready = KernelClient._async_wait_for_ready

# The classes to use for the various channels
shell_channel_class = Type(ZMQSocketChannel)
iopub_channel_class = Type(ZMQSocketChannel)
stdin_channel_class = Type(ZMQSocketChannel)
shell_channel_class = Type(AsyncZMQSocketChannel)
iopub_channel_class = Type(AsyncZMQSocketChannel)
stdin_channel_class = Type(AsyncZMQSocketChannel)
hb_channel_class = Type(HBChannel)
control_channel_class = Type(ZMQSocketChannel)
control_channel_class = Type(AsyncZMQSocketChannel)

_recv_reply = KernelClient._async_recv_reply

# replies come on the shell channel
execute = reqrep(wrapped, KernelClient.execute)
history = reqrep(wrapped, KernelClient.history)
complete = reqrep(wrapped, KernelClient.complete)
is_complete = reqrep(wrapped, KernelClient.is_complete)
inspect = reqrep(wrapped, KernelClient.inspect)
kernel_info = reqrep(wrapped, KernelClient.kernel_info)
comm_info = reqrep(wrapped, KernelClient.comm_info)
Expand Down
2 changes: 1 addition & 1 deletion jupyter_client/blocking/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def _(self, *args, **kwargs):
msg_id = meth(self, *args, **kwargs)
if not reply:
return msg_id
return run_sync(self._async_recv_reply)(msg_id, timeout=timeout, channel=channel)
return self._recv_reply(msg_id, timeout=timeout, channel=channel)

return _

Expand Down
107 changes: 82 additions & 25 deletions jupyter_client/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from .channelsabc import HBChannelABC
from .session import Session
from jupyter_client import protocol_version_info
from jupyter_client.utils import ensure_async

# import ZMQError in top-level namespace, to avoid ugly attribute-error messages
# during garbage collection of threads at exit
Expand Down Expand Up @@ -49,15 +50,15 @@ class HBChannel(Thread):

def __init__(
self,
context: t.Optional[zmq.asyncio.Context] = None,
context: t.Optional[zmq.Context] = None,
session: t.Optional[Session] = None,
address: t.Union[t.Tuple[str, int], str] = "",
):
"""Create the heartbeat monitor thread.

Parameters
----------
context : :class:`zmq.asyncio.Context`
context : :class:`zmq.Context`
The ZMQ context to use.
session : :class:`session.Session`
The session to use.
Expand Down Expand Up @@ -106,12 +107,6 @@ def _create_socket(self) -> None:

self.poller.register(self.socket, zmq.POLLIN)

def run(self) -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._async_run())
loop.close()

async def _async_run(self) -> None:
"""The thread's main activity. Call start() instead."""
self._create_socket()
Expand All @@ -127,16 +122,16 @@ async def _async_run(self) -> None:

since_last_heartbeat = 0.0
# no need to catch EFSM here, because the previous event was
# either a recv or connect, which cannot be followed by EFSM
await self.socket.send(b"ping")
# either a recv or connect, which cannot be followed by EFSM)
await ensure_async(self.socket.send(b"ping"))
request_time = time.time()
# Wait until timeout
self._exit.wait(self.time_to_dead)
# poll(0) means return immediately (see http://api.zeromq.org/2-1:zmq-poll)
self._beating = bool(self.poller.poll(0))
if self._beating:
# the poll above guarantees we have something to recv
await self.socket.recv()
await ensure_async(self.socket.recv())
continue
elif self._running:
# nothing was received within the time limit, signal heart failure
Expand All @@ -146,6 +141,12 @@ async def _async_run(self) -> None:
self._create_socket()
continue

def run(self) -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._async_run())
loop.close()

def pause(self) -> None:
"""Pause the heartbeat."""
self._pause = True
Expand Down Expand Up @@ -191,14 +192,14 @@ def call_handlers(self, since_last_heartbeat: float) -> None:


class ZMQSocketChannel(object):
"""A ZMQ socket in an async API"""
"""A ZMQ socket wrapper"""

def __init__(self, socket: zmq.asyncio.Socket, session: Session, loop: t.Any = None) -> None:
def __init__(self, socket: zmq.Socket, session: Session, loop: t.Any = None) -> None:
"""Create a channel.

Parameters
----------
socket : :class:`zmq.asyncio.Socket`
socket : :class:`zmq.Socket`
The ZMQ socket to use.
session : :class:`session.Session`
The session to use.
Expand All @@ -207,42 +208,41 @@ def __init__(self, socket: zmq.asyncio.Socket, session: Session, loop: t.Any = N
"""
super().__init__()

self.socket: t.Optional[zmq.asyncio.Socket] = socket
self.socket: t.Optional[zmq.Socket] = socket
self.session = session

async def _recv(self, **kwargs: t.Any) -> t.Dict[str, t.Any]:
def _recv(self, **kwargs: t.Any) -> t.Dict[str, t.Any]:
assert self.socket is not None
msg = await self.socket.recv_multipart(**kwargs)
msg = self.socket.recv_multipart(**kwargs)
ident, smsg = self.session.feed_identities(msg)
return self.session.deserialize(smsg)

async def get_msg(self, timeout: t.Optional[float] = None) -> t.Dict[str, t.Any]:
def get_msg(self, timeout: t.Optional[float] = None) -> t.Dict[str, t.Any]:
"""Gets a message if there is one that is ready."""
assert self.socket is not None
if timeout is not None:
timeout *= 1000 # seconds to ms
ready = await self.socket.poll(timeout)

ready = self.socket.poll(timeout)
if ready:
res = await self._recv()
res = self._recv()
return res
else:
raise Empty

async def get_msgs(self) -> t.List[t.Dict[str, t.Any]]:
def get_msgs(self) -> t.List[t.Dict[str, t.Any]]:
"""Get all messages that are currently ready."""
msgs = []
while True:
try:
msgs.append(await self.get_msg())
msgs.append(self.get_msg())
except Empty:
break
return msgs

async def msg_ready(self) -> bool:
def msg_ready(self) -> bool:
"""Is there a message that has been received?"""
assert self.socket is not None
return bool(await self.socket.poll(timeout=0))
return bool(self.socket.poll(timeout=0))

def close(self) -> None:
if self.socket is not None:
Expand All @@ -264,3 +264,60 @@ def send(self, msg: t.Dict[str, t.Any]) -> None:

def start(self) -> None:
pass


class AsyncZMQSocketChannel(ZMQSocketChannel):
"""A ZMQ socket in an async API"""

socket: zmq.asyncio.Socket

def __init__(self, socket: zmq.asyncio.Socket, session: Session, loop: t.Any = None) -> None:
"""Create a channel.

Parameters
----------
socket : :class:`zmq.asyncio.Socket`
The ZMQ socket to use.
session : :class:`session.Session`
The session to use.
loop
Unused here, for other implementations
"""
if not isinstance(socket, zmq.asyncio.Socket):
raise ValueError('Socket must be asyncio')
super().__init__(socket, session)

async def _recv(self, **kwargs: t.Any) -> t.Dict[str, t.Any]: # type:ignore[override]
assert self.socket is not None
msg = await self.socket.recv_multipart(**kwargs)
_, smsg = self.session.feed_identities(msg)
return self.session.deserialize(smsg)

async def get_msg( # type:ignore[override]
self, timeout: t.Optional[float] = None
) -> t.Dict[str, t.Any]:
"""Gets a message if there is one that is ready."""
assert self.socket is not None
if timeout is not None:
timeout *= 1000 # seconds to ms
ready = await self.socket.poll(timeout)
if ready:
res = await self._recv()
return res
else:
raise Empty

async def get_msgs(self) -> t.List[t.Dict[str, t.Any]]: # type:ignore[override]
"""Get all messages that are currently ready."""
msgs = []
while True:
try:
msgs.append(await self.get_msg())
except Empty:
break
return msgs

async def msg_ready(self) -> bool: # type:ignore[override]
"""Is there a message that has been received?"""
assert self.socket is not None
return bool(await self.socket.poll(timeout=0))
Loading