-
-
Notifications
You must be signed in to change notification settings - Fork 371
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Drop Tornado #876
Drop Tornado #876
Conversation
f516a39
to
1e60a97
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @davidbrochart for this. I got a question and a comment.
Thanks for the review @fcollonval. |
6b4b0b5
to
5c6282f
Compare
|
||
self.log.debug('Stopping shell ioloop') | ||
shell_io_loop = self.shell_stream.io_loop | ||
shell_io_loop.add_callback(shell_io_loop.stop) | ||
# FIXME: shouldn't need to be threadsafe | ||
shell_io_loop.call_soon_threadsafe(shell_io_loop.stop) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought that we shouldn't need to call_soon_threadsafe
because the shell event loop runs in the main thread, but actually we do because we are processing the shutdown request from the control channel, which runs in another thread.
Correct me if I'm wrong 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you're right
@@ -382,7 +382,7 @@ def start(self): | |||
self.session.send(self.shell_socket, 'execute_request', content, | |||
None, (self.shell_socket.getsockopt(ROUTING_ID))) | |||
|
|||
ident, msg = self.session.recv(self.shell_socket, mode=0) | |||
ident, msg = await self.session.async_recv(self.shell_socket, mode=0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will need jupyter/jupyter_client#749.
ipykernel/iostream.py
Outdated
# FIXME: original code was: | ||
# self._io_loop.call_later(self.flush_interval, self._flush) | ||
self._io_loop.call_soon_threadsafe(self._flush) | ||
self._io_loop.call_soon_threadsafe(self._io_loop.call_later, self.flush_interval, self._flush) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if call_soon_threadsafe
is needed. If not, we could keep the original code.
@@ -367,7 +367,7 @@ def close(self): | |||
if socket and not socket.closed: | |||
socket.close() | |||
self.log.debug("Terminating zmq context") | |||
# FIXME | |||
# FIXME: this line breaks shutdown | |||
# self.context.term() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why terminating the ZMQ context creates issues here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe related: jupyter-server/jupyter_server#677 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @blink1073, I can see that currently, we don't close the heartbeat socket before terminating the ZMQ context, but the other way around: we wait for the context to be terminated before closing the socket.
Maybe this can be the root cause, @minrk ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Context.term blocks until all sockets are closed and all unsent messages are delivered or discarded. A socket may close after the context if it is done from another thread. But it can't be done from a coroutine, for instance.
The things to check when a context hangs are always:
- are there sockets that haven't been closed, and
- was LINGER set on those sockets so they discard messages eventually
You can debug leftover sockets with self.context._sockets
which is a set of weakrefs to sockets (this is private and subject to change, but works for debugging).
The heartbeat socket doesn't share this context, so I don't think that's relevant.
ipykernel/zmqstream.py
Outdated
def send_multipart(self, msg_list, flags=0, copy=True, track=False, **kwargs): | ||
return self.socket.send_multipart(msg_list, copy=copy) | ||
|
||
def flush(self, flag=None, limit = None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flush has historically been very important behavior for ipykernel to behave appropriately. Making it a no-op doesn't seem great to me (It may not be as relevant today with everything being async, but this is a big change).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But flush seems to be supported only for Tornado's IOLoop and ZMQStream, right? I haven't found any reference of it outside of this context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true, because send
is async, but not awaitable with ZMQStream.send. We don't need this anymore as long as we await
all our sends, since asyncio Socket.send is awaitable.
|
||
self.log.debug('Stopping shell ioloop') | ||
shell_io_loop = self.shell_stream.io_loop | ||
shell_io_loop.add_callback(shell_io_loop.stop) | ||
# FIXME: shouldn't need to be threadsafe | ||
shell_io_loop.call_soon_threadsafe(shell_io_loop.stop) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you're right
@@ -367,7 +367,7 @@ def close(self): | |||
if socket and not socket.closed: | |||
socket.close() | |||
self.log.debug("Terminating zmq context") | |||
# FIXME | |||
# FIXME: this line breaks shutdown | |||
# self.context.term() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Context.term blocks until all sockets are closed and all unsent messages are delivered or discarded. A socket may close after the context if it is done from another thread. But it can't be done from a coroutine, for instance.
The things to check when a context hangs are always:
- are there sockets that haven't been closed, and
- was LINGER set on those sockets so they discard messages eventually
You can debug leftover sockets with self.context._sockets
which is a set of weakrefs to sockets (this is private and subject to change, but works for debugging).
The heartbeat socket doesn't share this context, so I don't think that's relevant.
Although ipykernel seems functional with these changes, I don't really understand the test failures. |
I wouldn't do that. |
👍 a simpler |
0b9ddbd
to
76ee4ac
Compare
Closing in favor of #1079. |
Closes #656