Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

detect closed StreamWriter/StreamReader not working #120

Closed
komuw opened this issue May 30, 2019 · 4 comments · Fixed by #113
Closed

detect closed StreamWriter/StreamReader not working #120

komuw opened this issue May 30, 2019 · 4 comments · Fixed by #113
Labels
bug Something isn't working

Comments

@komuw
Copy link
Owner

komuw commented May 30, 2019

import asyncio
import logging

logging.basicConfig(level=logging.DEBUG)
smsc_host = "127.0.0.1"
smsc_port = 2775

async def connect():
    reader, writer = await asyncio.wait_for(
        asyncio.open_connection(smsc_host, smsc_port), timeout=3.4
    )
    writer.transport.set_write_buffer_limits(16)
    return reader, writer


loop = asyncio.get_event_loop()
reader, writer = loop.run_until_complete(connect())

async def main():
    while True:
        print("wrtiting. witer={0}".format(writer.transport.get_protocol()._stream_writer))

        # https://github.com/aio-libs/aiohttp/issues/2499
        if writer.transport is None or writer.transport.is_closing():
            raise ConnectionResetError("Cannot write to closing transport")
        writer.write(b"msg")
        await writer.drain()

        print("wrtitten. reader={0}".format(writer.transport.get_protocol()._stream_reader))
        print("cloisng={0}".format(writer.transport.is_closing()))

        try:
            await asyncio.wait_for(reader.read(4), timeout=1.2)
        except asyncio.TimeoutError as e:
            print("\n\n\t timeout", str(e))
            print("\n")

        await asyncio.sleep(3)

loop.run_until_complete(main())
  1. start a server at 127.0.0.1:2775
  2. run the code above
  3. stop the server at 127.0.0.1:2775
  4. the code above continues running as if the server is still available.
    writer.transport.is_closing() is supposed to tell us when/if the connection is broken but it does not.
@komuw komuw added the bug Something isn't working label May 30, 2019
@komuw
Copy link
Owner Author

komuw commented May 30, 2019

this one connecting to google works

import sys
import asyncio
import logging

# using python3.6
logging.basicConfig(format="%(message)s", stream=sys.stdout, level=logging.DEBUG)

async def connect():
    reader, writer = await asyncio.open_connection(host="google.com", port=80)
    writer.transport.set_write_buffer_limits(2)
    return reader, writer

async def main():
    _, writer = await connect()
    while True:
        # https://github.com/aio-libs/aiohttp/issues/2499
        if writer.transport is None or writer.transport.is_closing():
            # this should detect lost connection but it doesn't
            raise ConnectionResetError("Cannot write to closing transport")
        print("about to write")
        writer.write(b"SomeMessage")
        await writer.drain()
        print("written")
        await asyncio.sleep(4)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
  1. start the code
  2. kill off wifi(after killing wifi, it continues writing for a short while then raises error)
about to write
written
about to write
written
about to write
written
about to write
written
about to write
written
about to write
written

	 _read_ready called

self.max_size:  262144
self._conn_lost:  0

[Errno 60] Operation timed out
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/selector_events.py", line 718, in _read_ready
    data = self._sock.recv(self.max_size)
TimeoutError: [Errno 60] Operation timed out
Fatal read error on socket transport
protocol: <asyncio.streams.StreamReaderProtocol object at 0x103d16668>
transport: <_SelectorSocketTransport fd=9 read=polling write=<idle, bufsize=0>>

@komuw
Copy link
Owner Author

komuw commented May 30, 2019

However the following works okay:

import asyncio

async def connect():
    reader, writer = await asyncio.open_connection(host="127.0.0.1", port=8000)
    return reader, writer

async def main():
    _, writer = await connect()
    while True:
        # https://github.com/aio-libs/aiohttp/issues/2499
        if writer.transport is None or writer.transport.is_closing():
            # this should detect lost connection but it doesn't
            raise ConnectionResetError("Cannot write to closing transport")

        print("about to write")
        writer.write(b"SomeMessage")
        await writer.drain()
        print("written")
        await asyncio.sleep(1)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
  1. start a python server on localhost
    python2 -m SimpleHTTPServer 8000
  2. run the code above
  3. kill the python server
about to write
written
about to write
written
about to write
written
about to write
written
about to write
written
about to write
Traceback (most recent call last):
  File "cool.py", line 26, in <module>
    loop.run_until_complete(main())
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
    return future.result()
  File "cool.py", line 19, in main
    await writer.drain()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/streams.py", line 337, in drain
    yield from self._protocol._drain_helper()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/streams.py", line 211, in _drain_helper
    raise ConnectionResetError("Connection lost")
ConnectionResetError: Connection lost

@komuw
Copy link
Owner Author

komuw commented May 30, 2019

I'm beginning to supect that the difference between the example above that works(using SimpleHTTPServer) and the ones that do not is that those that do not are talking to a server running in docker.
And when we kill that docker container, our code is left with a connection open to the docker_proxy thing that is setup by docker/docker-compose. So our code still thinks it is still connected to the real server

@komuw
Copy link
Owner Author

komuw commented May 30, 2019

for the docker examples, if you restart docker itself; I'm able to get a ConnectionResetError

@komuw komuw mentioned this issue May 31, 2019
komuw added a commit that referenced this issue May 31, 2019
What:
- Introduce a configurable timeout when trying to connect to SMSC
- Try and detect when the connection to SMSC is disconnected and attempt to re-connect & re-bind
- bugfix; `asyncio.streams.StreamWriter.drain` should not be called concurrently by multiple coroutines[1]
- when shutting down, `naz` now tries to make sure that write buffers are properly flushed[2][3]

Why:
- Make `naz` more failure tolerant
- Fixes: #67
- Fixes: #114
- Fixes: #116
- Fixes: #117
- Fixes: #120

References:
1. https://bugs.python.org/issue29930
2. https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/
3. aio-libs/aiohttp#1369
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant