Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

open_websocket context manager doesn't exit if there are pending receives #74

Closed
belm0 opened this issue Nov 5, 2018 · 16 comments · Fixed by #79
Closed

open_websocket context manager doesn't exit if there are pending receives #74

belm0 opened this issue Nov 5, 2018 · 16 comments · Fixed by #79
Assignees
Labels
bug Something isn't working

Comments

@belm0
Copy link
Member

belm0 commented Nov 5, 2018

is this behaving as intended?

import trio
from trio_websocket import ConnectionClosed, serve_websocket, open_websocket_url


class EchoServer:
    """Websocket server that echos any message starting with '?'."""

    def __init__(self):
        self._server = None

    async def _handleConnection(self, request):
        ws = await request.accept()
        while True:
            try:
                message = await ws.get_message()
                if not message.startswith("?"):
                    return
                await ws.send_message(message)
            except ConnectionClosed:
                break

    async def serve(self, port, *, task_status=trio.TASK_STATUS_IGNORED):
        await serve_websocket(self._handleConnection, '127.0.0.1', port,
                              ssl_context=None, task_s```


async def test_foo():
    server = EchoServer()
    async with trio.open_nursery() as nursery:
        await nursery.start(server.serve, 1234)
        async with open_websocket_url('ws://127.0.0.1:1234/') as ws:
            await ws.send_message('?foo')
            # test hangs here because the echo response hasn't been consumed?
        nursery.cancel_scope.cancel()
@belm0
Copy link
Member Author

belm0 commented Nov 5, 2018

(it's surprising to me, and I couldn't find reference to this behavior in the docs)

@mehaase
Copy link
Contributor

mehaase commented Nov 5, 2018

Hmm, I don't know why it's doing that. It definitely should not hang there. I will look into further.

@belm0 belm0 added the bug Something isn't working label Nov 6, 2018
@belm0
Copy link
Member Author

belm0 commented Nov 6, 2018

In the test run at #77, I'm observing that the server exits its aclose() first, and the client aclose() blocks indefinitely on self._close_handshake.wait().

@belm0
Copy link
Member Author

belm0 commented Nov 6, 2018

debug log

__init__.py                966 DEBUG    Listening on ws://127.0.0.1:50315
__init__.py                 83 DEBUG    Connecting to ws://127.0.0.1:50315/resource
__init__.py                845 DEBUG    conn#1 sending 162 bytes
__init__.py                833 DEBUG    conn#0 received 162 bytes
__init__.py                812 DEBUG    conn#0 received event: ConnectionRequested None
__init__.py                845 DEBUG    conn#0 sending 110 bytes
__init__.py                833 DEBUG    conn#1 received 110 bytes
__init__.py                812 DEBUG    conn#1 received event: ConnectionEstablished None
__init__.py                845 DEBUG    conn#1 sending 11 bytes
__init__.py                833 DEBUG    conn#0 received 11 bytes
__init__.py                812 DEBUG    conn#0 received event: TextReceived hello
test_connection.py          54 DEBUG    SENT MSG
__init__.py                845 DEBUG    conn#0 sending 7 bytes
__init__.py                833 DEBUG    conn#1 received 7 bytes
__init__.py                812 DEBUG    conn#1 received event: TextReceived hello
__init__.py                845 DEBUG    conn#0 sending 4 bytes
test_connection.py          57 DEBUG    WAITING FOR EXIT
__init__.py                845 DEBUG    conn#1 sending 8 bytes
__init__.py                833 DEBUG    conn#0 received 8 bytes
__init__.py                812 DEBUG    conn#0 received event: ConnectionClosed None
__init__.py                852 DEBUG    conn#0 no pending data to send
__init__.py                661 DEBUG    conn#0 websocket closed <ConnectionClosed <CloseReason code=1000 name=NORMAL_CLOSURE reason=None>>
__init__.py                836 DEBUG    conn#0 reader task finished

@belm0
Copy link
Member Author

belm0 commented Nov 6, 2018

The reader task is processing handlers sequentially. Since the TextReceived handler is waiting for the echoed message to be consumed, the reader task never gets the ConnectionClosed event.

Besides fixing that, it seems like aclose should have some timeout when waiting for ConnectionClosed.

@mehaase
Copy link
Contributor

mehaase commented Nov 6, 2018

Thanks for debugging this!

We can alleviate this by increasing the size of the memory channel, but it will always be possible for the reader task to get blocked by unread messages. I suggest that we make the size of the memory channel a configurable setting and a maximum size per message. I think Nathaniel already suggested this somewhere in another open issue. It's also how Aaugstin's websockets library handles userspace buffering:

The max_size parameter enforces the maximum size for incoming messages in bytes. The default value is 1MB. None disables the limit. If a message larger than the maximum size is received, recv() will raise ConnectionClosed and the connection will be closed with status code 1009.

The max_queue parameter sets the maximum length of the queue that holds incoming messages. The default value is 32. 0 disables the limit. Messages are added to an in-memory queue when they’re received; then recv() pops from that queue. In order to prevent excessive memory consumption when messages are received faster than they can be processed, the queue must be bounded. If the queue fills up, the protocol stops processing incoming data until recv() is called. In this situation, various receive buffers (at least in asyncio and in the OS) will fill up, then the TCP receive window will shrink, slowing down transmission to avoid packet loss.

The only change I would make to this is to remove zero disables the limit, since zero is a valid size for a Trio channel and the caller can specify a really big number to practically achieve the same thing.

We should also add a note in the documentation that highlights this issue, because it's a surprise to people who don't frequently write networking code.

Besides fixing that, it seems like aclose should have some timeout when waiting for ConnectionClosed.

This is a good idea. Timeouts in general have not been adequately addressed in this library to date. I opened up #64 to work on timeouts throughout the library.

@njsmith
Copy link
Member

njsmith commented Nov 6, 2018

really big number to practically achieve the same thing

In fact Trio channels support inf as the size.

it will always be possible for the reader task to get blocked by unread messages

It seems like it might be possible to unblock it when shutting down, though?

Note that if you close a channel handle, then any tasks that are blocked on that handle immediately resume with a ClosedResourceError, so it should be easy to wake up the reader task in this case.

@belm0
Copy link
Member Author

belm0 commented Nov 7, 2018

Between the two (buffering and timeout control), the latter is higher priority for me-- specifically the guarantee that aclose() doesn't take an inordinate amount of time. This issue is blocking migration of my app's ws client to trio-websocket. (I would not want to configure a large queue, and given reasonable queue sizes there is always the case that it will get filled.)

Nathaniel's suggestion sounds promising. I think it means that aclose() should close the receive channel used by get_message() before waiting on the close handshake. Either the data-received handler or reader task would then need to deal with ClosedResourceError if we want to advance to the ConnectionClosed event-- I'm not sure how that should look.

mehaase added a commit that referenced this issue Nov 7, 2018
As described in the issue, get_message() was raising connection closed
even if there were pending messages. Per Nathaniel's suggestion, the
proper behavior is this:

1. If the remote endpoint closed the connection, then the local endpoint
   may continue reading all messages sent prior to closing.
2. If the local endpoint closed the connection, then the local endpoint
   may not read any more messages.

I added tests for these two conditions and implemented the behavior by
closing the ReceiveChannel inside the connection's `aclose()`. This
requires a bit of additional exception handling inside `get_message()`
and inside the reader task.

One slight surprise is that the test can't be written due to the bug
in #74! The client would hang because the reader task is blocked by
the unconsumed messages. So I changed the channel size to 32, which
allows this test to work, and I will replace this hard-coded value
when I fix #74.
@mehaase
Copy link
Contributor

mehaase commented Nov 8, 2018

I think it means that aclose() should close the receive channel used by get_message() before waiting on the close handshake.

This is now done on PR #79.

Between the two (buffering and timeout control), the latter is higher priority for me

I'd like to keep this ticket dedicated to configuring the message buffer and address timeouts in #64. Can you please take a look at #64 and share your opinion on the various options presented there?

@belm0
Copy link
Member Author

belm0 commented Nov 9, 2018

I think it means that aclose() should close the receive channel used by get_message() before waiting on the close handshake.

This is now done on PR #79.

Nathaniel was implying that this would unblock the reader task and allow it to receive the ConnectionClosed, resolving the original issue. I'd like to understand this better.

I'd like to keep this ticket dedicated to configuring the message buffer and address timeouts in #64. Can you please take a look at #64 and share your opinion on the various options presented there?

Please organize things as you see fit, but I'd like to reiterate that this bug is about indefinite blocking of the context manager, and it can't be fully addressed by offering buffer configuration. I don't want to use a buffer size of more than 0 or 1, and even reasonable sizes beyond that can trivially become filled. (E.g. an app might do some blocking work between the last ws read and exiting the cm scope, during which time a lot of messages could have been received.)

The priorities I see, in order, are: 1) confirm whether Nathaniel's suggestion can allow normal shutdown, at least in the case where the server is being responsive, 2) add some configurable timeout to waiting for close handshake from the server, 3) provide buffering configuration.

@belm0
Copy link
Member Author

belm0 commented Nov 9, 2018

With PR #79, but buffer size reverted to 0, indeed the test I wrote in #77 for this bug now passes.

Regarding test_read_messages_after_remote_close in #79, in the absence of this bug it does still hang, but that's due to a deadlock in the test as it's written. With a buffer size of 0, the server handler can never send its 2nd message and set the event, because the client is waiting for the event to consume the messages.

So the buffer workaround in #79 should be removed, and the test deadlock fixed. That's enough to close this bug, and the timeout / buffer enhancements are future work. (Timeout still sounds higher priority.)

@mehaase mehaase self-assigned this Nov 9, 2018
@mehaase
Copy link
Contributor

mehaase commented Nov 9, 2018

With a buffer size of 0, the server handler can never send its 2nd message and set the event, because the client is waiting for the event to consume the messages.

The server sends all of its messages (they end up in some kernel buffer, I guess), but its context manager cannot exit because its waiting for the client to handshake. The client won't handshake because the reader task is blocked trying to deliver the first message.

I don't think this test deadlock can be "fixed", because this is exactly the behavior that we want to test: the client has done the closing handshake but the caller can still get pending messages. I have disabled the test for the time being and will re-enable it when the buffer size can be configured.

@belm0
Copy link
Member Author

belm0 commented Nov 9, 2018

The server sends all of its messages (they end up in some kernel buffer, I guess), but its context manager cannot exit because its waiting for the client to handshake. The client won't handshake because the reader task is blocked trying to deliver the first message.

This is not what I observed in the debugger. With a buffer size of 0, the server handler actually blocked on the very first send_message(). This was very surprising-- I'd appreciate if you would confirm and perhaps make sense of it.

belm0 pushed a commit to belm0/trio-websocket that referenced this issue Nov 9, 2018
As described in the issue, get_message() was raising connection closed
even if there were pending messages. Per Nathaniel's suggestion, the
proper behavior is this:

1. If the remote endpoint closed the connection, then the local endpoint
   may continue reading all messages sent prior to closing.
2. If the local endpoint closed the connection, then the local endpoint
   may not read any more messages.

I added tests for these two conditions and implemented the behavior by
closing the ReceiveChannel inside the connection's `aclose()`. This
requires a bit of additional exception handling inside `get_message()`
and inside the reader task.

One slight surprise is that the test can't be written due to the bug
in python-trio#74! The client would hang because the reader task is blocked by
the unconsumed messages. So I changed the channel size to 32, which
allows this test to work, and I will replace this hard-coded value
when I fix python-trio#74.
belm0 pushed a commit to belm0/trio-websocket that referenced this issue Nov 10, 2018
As described in the issue, get_message() was raising connection closed
even if there were pending messages. Per Nathaniel's suggestion, the
proper behavior is this:

1. If the remote endpoint closed the connection, then the local endpoint
   may continue reading all messages sent prior to closing.
2. If the local endpoint closed the connection, then the local endpoint
   may not read any more messages.

I added tests for these two conditions and implemented the behavior by
closing the ReceiveChannel inside the connection's `aclose()`. This
requires a bit of additional exception handling inside `get_message()`
and inside the reader task.

One slight surprise is that the test can't be written due to the bug
in python-trio#74! The client would hang because the reader task is blocked by
the unconsumed messages. So I changed the channel size to 32, which
allows this test to work, and I will replace this hard-coded value
when I fix python-trio#74.
@mehaase
Copy link
Contributor

mehaase commented Nov 10, 2018

I haven't tried in the debugger yet. I added a bunch of logging statements and a trio timeout to trace the code:

async def test_read_messages_after_remote_close(nursery):
    '''
    When the remote endpoint closes, the local endpoint can still read all
    of the messages sent prior to closing. Any attempt to read beyond that will
    raise ConnectionClosed.
    '''
    server_closed = trio.Event()
    import logging

    with trio.fail_after(2):
        async def handler(request):
            server = await request.accept()
            async with server:
                logging.debug('server A')
                await server.send_message('1')
                logging.debug('server B')
                await server.send_message('2')
                logging.debug('server C')
            server_closed.set()
            logging.debug('server D')

        server = await nursery.start(
            partial(serve_websocket, handler, HOST, 0, ssl_context=None))

        async with open_websocket(HOST, server.port, '/', use_ssl=False) as client:
            logging.debug('client A')
            await server_closed.wait()
            logging.debug('client B')
            assert await client.get_message() == '1'
            logging.debug('client C')
            assert await client.get_message() == '2'
            logging.debug('client D')
            with pytest.raises(ConnectionClosed):
                logging.debug('client E')
                await client.get_message()
            logging.debug('client F')

Running this produces the following logs:

(venv) mhaase@MEH-MBP:/V/C/trio-websocket:master$ pytest -k test_read_messages_after_remote_close --log-level debug
=============================================== test session starts ================================================
platform darwin -- Python 3.7.0, pytest-3.10.0, py-1.7.0, pluggy-0.8.0
rootdir: /Volumes/Case_Sensitive/trio-websocket, inifile: pytest.ini
plugins: trio-0.5.1, cov-2.6.0
collected 30 items / 29 deselected

tests/test_connection.py F                                                                                   [100%]

===================================================== FAILURES =====================================================
______________________________________ test_read_messages_after_remote_close _______________________________________

... SNIP EXCEPTIONS ...

venv/lib/python3.7/site-packages/trio/_timeouts.py:118: TooSlowError
------------------------------------------------ Captured log call -------------------------------------------------
_impl.py                   960 DEBUG    Listening on ws://127.0.0.1:52315
_impl.py                    82 DEBUG    Connecting to ws://127.0.0.1:52315/
_impl.py                   839 DEBUG    conn#1 sending 154 bytes
_impl.py                   827 DEBUG    conn#0 received 154 bytes
_impl.py                   806 DEBUG    conn#0 received event: ConnectionRequested
_impl.py                   839 DEBUG    conn#0 sending 110 bytes
_impl.py                   827 DEBUG    conn#1 received 110 bytes
_impl.py                   806 DEBUG    conn#1 received event: ConnectionEstablished
test_connection.py         400 DEBUG    server A
test_connection.py         412 DEBUG    client A
_impl.py                   839 DEBUG    conn#0 sending 3 bytes
test_connection.py         402 DEBUG    server B
_impl.py                   827 DEBUG    conn#1 received 3 bytes
_impl.py                   806 DEBUG    conn#1 received event: TextReceived
_impl.py                   839 DEBUG    conn#0 sending 3 bytes
test_connection.py         404 DEBUG    server C
_impl.py                   839 DEBUG    conn#0 sending 4 bytes
_impl.py                   658 DEBUG    conn#0 websocket closed <ConnectionClosed <CloseReason code=1006 name=ABNORMAL_CLOSURE reason=None>>
_impl.py                   830 DEBUG    conn#0 reader task finished
===================================== 1 failed, 29 deselected in 2.22 seconds ======================================
(venv) mhaase@MEH-MBP:/V/C/trio-websocket:master$

The client prints "A" and then hangs: it is waiting for the server_closed event.

The server prints "A", "B", "C", and then hangs: it must be stuck in the CM exit. The logs also indicate that the client conn#1 gets one TextReceived event and then no further events, indicating that the reader task is blocked.

I'll try this out in the debugger next.

@mehaase
Copy link
Contributor

mehaase commented Nov 10, 2018

Trying this out in the debugger. I reverted the logging code from the previous post and added import pdb and pdb.set_trace() to the unit test.

(venv) mhaase@MEH-MBP:/V/C/trio-websocket:master$ pytest -k test_read_messages_after_remote_close --pdb
=== test session starts ===
platform darwin -- Python 3.7.0, pytest-3.10.0, py-1.7.0, pluggy-0.8.0
rootdir: /Volumes/Case_Sensitive/trio-websocket, inifile: pytest.ini
plugins: trio-0.5.1, cov-2.6.0
collected 30 items / 29 deselected

tests/test_connection.py
>>> PDB set_trace (IO-capturing turned off) >>>
> /Volumes/Case_Sensitive/trio-websocket/tests/test_connection.py(405)test_read_messages_after_remote_close()
-> server = await nursery.start(

(Pdb) break handler
Breakpoint 1 at /Volumes/Case_Sensitive/trio-websocket/tests/test_connection.py:397
(Pdb) c
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PDB continue (IO-capturing resumed) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> /Volumes/Case_Sensitive/trio-websocket/tests/test_connection.py(398)handler()
-> server = await request.accept()
(Pdb) n
> /Volumes/Case_Sensitive/trio-websocket/tests/test_connection.py(399)handler()
-> async with server:
(Pdb)
> /Volumes/Case_Sensitive/trio-websocket/tests/test_connection.py(400)handler()
-> await server.send_message('1')
(Pdb)
Internal StopIteration: <trio_websocket._impl.WebSocketConnection object at 0x105b45b70>
> /Volumes/Case_Sensitive/trio-websocket/tests/test_connection.py(408)test_read_messages_after_remote_close()
-> async with open_websocket(HOST, server.port, '/', use_ssl=False) as client:
(Pdb)
> /Volumes/Case_Sensitive/trio-websocket/tests/test_connection.py(409)test_read_messages_after_remote_close()
-> await server_closed.wait()
(Pdb)
Internal StopIteration
> /Volumes/Case_Sensitive/trio-websocket/tests/test_connection.py(400)handler()
-> await server.send_message('1')
(Pdb)
> /Volumes/Case_Sensitive/trio-websocket/tests/test_connection.py(401)handler()
-> await server.send_message('2')
(Pdb)

This looks like it sends two messages.

@belm0
Copy link
Member Author

belm0 commented Nov 10, 2018

I'm debugging in PyCharm (uses Pdb I believe). I tried again and it's matching what you observed now. Sorry for taking your time!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants