Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection lost on await writer.drain() #67

Closed
komuw opened this issue Jan 18, 2019 · 7 comments · Fixed by #113
Closed

Connection lost on await writer.drain() #67

komuw opened this issue Jan 18, 2019 · 7 comments · Fixed by #113
Assignees
Labels
bug Something isn't working

Comments

@komuw
Copy link
Owner

komuw commented Jan 18, 2019

This program

import asyncio
import os

os.environ["PYTHONASYNCIODEBUG"] = "1"

loop = asyncio.get_event_loop()


async def client():
    reader, writer = await asyncio.open_connection("35.173.6.94", 80, loop=loop)
    print("\n connected to localhost:9881")
    while True:
        print("writer.transport._conn_lost", writer.transport._conn_lost)
        if writer.transport._conn_lost:
            writer.close()
            reader, writer = await asyncio.open_connection("35.173.6.94", 80, loop=loop)

        req = b"hello"
        writer.write(req)
        await writer.drain()
        # import pdb

        # pdb.set_trace()

        print("\nsent request\n")

        data = await reader.read(2)
        print("\n\nread:\n")
        print(data)


loop.run_until_complete(client())

produces the error:

read:

b''
writer.transport._conn_lost 0
Executing <Handle <TaskWakeupMethWrapper object at 0x1053433d8>(<Future finis...events.py:377>) created at /usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/streams.py:408> took 1.974 seconds
Traceback (most recent call last):
  File "smpp/debug.py", line 35, in <module>
    loop.run_until_complete(client())
  File "/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 568, in run_until_complete
    return future.result()
  File "smpp/debug.py", line 23, in client
    await writer.drain()
  File "/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/streams.py", line 348, in drain
    await self._protocol._drain_helper()
  File "/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/streams.py", line 202, in _drain_helper
    raise ConnectionResetError('Connection lost')
ConnectionResetError: Connection lost
@komuw
Copy link
Owner Author

komuw commented Jan 18, 2019

This program though seems to work;

import struct
import asyncio
import os

os.environ["PYTHONASYNCIODEBUG"] = "1"

loop = asyncio.get_event_loop()


async def client():
    reader, writer = await asyncio.open_connection("35.173.6.94", 80, loop=loop)
    print("\n connected to localhost:9881")
    while True:

        req = b"hello"
        writer.write(req)
        print("writer.transport._conn_lost", writer.transport._conn_lost)
        if writer.transport._conn_lost:
            writer.close()
            reader, writer = await asyncio.open_connection("35.173.6.94", 80, loop=loop)
        await writer.drain()
        # import pdb

        # pdb.set_trace()

        print("\nsent request\n")

        data = await reader.read(2)
        print("\n\nread:\n")
        print(data)


loop.run_until_complete(client())

ie this part seems to be key:

        # Do write, then check if connection is lost just before calling drain
        writer.write(req)
        print("writer.transport._conn_lost", writer.transport._conn_lost)
        if writer.transport._conn_lost:
            writer.close()
            reader, writer = await asyncio.open_connection("35.173.6.94", 80, loop=loop)
        await writer.drain()

The program works and stays up longer than the previous one.
Then at some point it fails with a BrokenPipe;

writer.transport._conn_lost 0
Executing <Handle <TaskWakeupMethWrapper object at 0x10eed4d98>(<Future finis...events.py:377>) created at /usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/streams.py:182> took 0.371 seconds
Traceback (most recent call last):
  File "smpp/debug.py", line 34, in <module>
    loop.run_until_complete(client())
  File "/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 568, in run_until_complete
    return future.result()
  File "smpp/debug.py", line 22, in client
    await writer.drain()
  File "/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/streams.py", line 348, in drain
    await self._protocol._drain_helper()
  File "/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/streams.py", line 209, in _drain_helper
    await waiter
  File "/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/selector_events.py", line 891, in _write_ready
    n = self._sock.send(self._buffer)
BrokenPipeError: [Errno 32] Broken pipe

@komuw
Copy link
Owner Author

komuw commented Jan 18, 2019

Applied to naz, we have been getting some connection lost errors.

body = b""
command_length = 16 + len(body)  # 16 is for headers
command_id = cli.command_ids["enquire_link"]
command_status = 0x00000000
sequence_number = cli.sequence_generator.next_sequence()
header = struct.pack(">IIII", command_length, command_id, command_status, sequence_number)
full_pdu = header + body
loop.run_until_complete(
    cli.send_data(smpp_event="enquire_link", msg=full_pdu, correlation_id="correlation_id3")
)

error:

Traceback (most recent call last):
  File "<stdin>", line 2, in <module>
  File "/usr/local/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
    return future.result()
  File "/usr/local/lib/python3.6/site-packages/naz/client.py", line 866, in send_data
    await self.writer.drain()
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 339, in drain
    yield from self._protocol._drain_helper()
  File "/usr/local/lib/python3.6/asyncio/streams.py", line 210, in _drain_helper
    raise ConnectionResetError('Connection lost')
ConnectionResetError: Connection lost

see the culprit line;

await self.writer.drain()

@komuw
Copy link
Owner Author

komuw commented Jan 18, 2019

to TRY and manually fix the issue with naz above, this code tries to do that:

### this alone with NO `cli.tranceiver_bind()`
reader, writer = loop.run_until_complete(cli.connect()) 

### make SURE the session state is the right one.
cli.current_session_state = "BOUND_TRX" 

## send `enquire_link`
loop.run_until_complete(
    cli.send_data(smpp_event="enquire_link", msg=full_pdu, correlation_id="correlation_id3")
)

## read `enquire_link_resp` and print it
full_enquire_link_resp_pdu_data = receive_data()
parsed_enquire_link_resp = parse_response_pdu(full_enquire_link_resp_pdu_data)
print("\n parsed_enquire_link_resp: \n")
print(parsed_enquire_link_resp)

prints

{
'correlation_id': None, 
'command_id': 2147483669, 
'command_status': 4, 
'smpp_event': 'enquire_link_resp', 
'command_status_name': 'ESME_RINVBNDSTS', 
'command_status_value': CommandStatus(code=4, description='Incorrect BIND Status for given command'), 
'command_status_code': 4, 
'command_status_value_description': 'Incorrect BIND Status for given command'
}

Inference:
Incorrect BIND Status for given command ; This is happening because:

  • the connection dropped, and thus the SMSC updated it's session state to CLOSED[1]
  • we re-connect to SMSC (via cli.connect()), and SMSC updates it's session state to OPEN[1]
  • we send an enquire_link request. The SMSC sends an enquire_link_resp with a failure message since it's session state is still OPEN.
  • What we want to do is send the enquire_link request when SMSC session state is BOUND_TRX[1].
    For that to happen, we need to send a cli.tranceiver_bind() straight after sending cli.connect()

ref:

  1. section 2.2 of SMPP specification document v3.4

@komuw
Copy link
Owner Author

komuw commented Jan 18, 2019

This manual fix, fixes that issue:

### connect
reader, writer = loop.run_until_complete(cli.connect()) 

### send bind_transceiver
loop.run_until_complete(
    cli.send_data(smpp_event="bind_transceiver", msg=full_pdu, correlation_id="correlation_id1")
)
### read bind_transceiver_resp
full_tranceiver_bind_resp_pdu_data = receive_data()
parsed_tranceiver_bind_resp = parse_response_pdu(full_tranceiver_bind_resp_pdu_data)


## send `enquire_link`
loop.run_until_complete(
    cli.send_data(smpp_event="enquire_link", msg=full_pdu, correlation_id="correlation_id3")
)

## read `enquire_link_resp` and print it
full_enquire_link_resp_pdu_data = receive_data()
parsed_enquire_link_resp = parse_response_pdu(full_enquire_link_resp_pdu_data)
print("\n parsed_enquire_link_resp: \n")
print(parsed_enquire_link_resp)

prints

{
'correlation_id': None, 
'command_id': 2147483669, 
'command_status': 0, 
'smpp_event': 'enquire_link_resp', 
'command_status_name': 'ESME_ROK', 
'command_status_value': CommandStatus(code=0, description='Success'), 
'command_status_code': 0, 
'command_status_value_description': 'Success'
}

@komuw komuw added the bug Something isn't working label Jan 18, 2019
@komuw komuw self-assigned this Jan 18, 2019
@komuw
Copy link
Owner Author

komuw commented Jan 18, 2019

We need to continuously check if connection is broken(how??, maybe cli.writer.transport._conn_lost) and if it is lost, then do;

  • cli.connect()
  • cli.tranceiver_bind()

We should also tighten up the session state (and their transitions) -> #52

@komuw
Copy link
Owner Author

komuw commented Jan 19, 2019

workaround:

One reason why you may experience Connection lost errors is because of session timers[1][2].
As an example, if an SMSC has a very small inactivity_timer[2], then the SMSC may be closing the session(and the connection) at the expiry of that inactivity_timer.

One way to workaround this, is to set a very low enquire_link_interval on your naz client;

import asyncio
import naz

loop = asyncio.get_event_loop()
outboundqueue = naz.q.SimpleOutboundQueue(maxsize=1000, loop=loop)
cli = naz.Client(
    async_loop=loop,
    smsc_host="127.0.0.1",
    smsc_port=2775,
    system_id="smppclient1",
    password="password",
    enquire_link_interval=5,
)

or if using the naz command line app;

{
    "smsc_host": "127.0.0.1",
    "smsc_port": 5004,
    "system_id": "SomeID",
    "password": "somePassword",
    "enquire_link_interval": 5
}

where the value you chose for enquire_link_interval is less than the value set on SMSC of inactivity_timer.

That way, the inactivity_timer will never be met on the SMSC side.

ref:

  1. implement SMPP timers that concern ESME #73
  2. see; sections 2.9 and 7.2 of SMPP specification document v3.4

@komuw
Copy link
Owner Author

komuw commented May 28, 2019

How to reproduce, with the current head at;

  1. docker-compose up
  2. naz-cli --config examples/example_config.json
  3. python examples/example_klasses.py
  4. kill the smpp_server container

@komuw komuw mentioned this issue May 28, 2019
komuw added a commit that referenced this issue May 31, 2019
What:
- Introduce a configurable timeout when trying to connect to SMSC
- Try and detect when the connection to SMSC is disconnected and attempt to re-connect & re-bind
- bugfix; `asyncio.streams.StreamWriter.drain` should not be called concurrently by multiple coroutines[1]
- when shutting down, `naz` now tries to make sure that write buffers are properly flushed[2][3]

Why:
- Make `naz` more failure tolerant
- Fixes: #67
- Fixes: #114
- Fixes: #116
- Fixes: #117
- Fixes: #120

References:
1. https://bugs.python.org/issue29930
2. https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/
3. aio-libs/aiohttp#1369
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant