Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconnecting fails with ConnectionResetError #127

Open
nils-borrmann-tacto opened this issue Nov 9, 2024 · 4 comments
Open

Reconnecting fails with ConnectionResetError #127

nils-borrmann-tacto opened this issue Nov 9, 2024 · 4 comments

Comments

@nils-borrmann-tacto
Copy link
Contributor

nils-borrmann-tacto commented Nov 9, 2024

I'm connecting to clickhouse using the asynch Pool. If I let the connection idle for a few minutes, apparently the server closes the connection on its end. Now, the next time I try to execute a query using an existing pool, asynch gets a ConnectionResetError during ping(), this is still handled gracefully, however when reconnecting, asynch tries to close the already closed connection and crashes, again with a ConnectionResetError. (On a sidenote: Sometimes I get a BrokenPipeError instead of ConnectionResetError)

I managed to fix this quick-and-dirty by wrapping self.writer.close() in a try-except. Another option could be to set connection.connected = False in the error handling inside ping().

Demo code:

import logging

from asynch import Pool
import asyncio

logging.basicConfig()
logging.getLogger("asynch.proto.connection").setLevel(logging.DEBUG)

connection_string ='clickhouse://@p4c0wmo0db.germanywestcentral.azure.clickhouse.cloud:9440/production?secure=True'

loop = asyncio.new_event_loop()


async def init_pool():
    pool = Pool(minsize=1, maxsize=2, dsn=connection_string)
    await pool.startup()
    return pool


async def execute(pool):
    async with pool.connection() as conn:
        async with conn.cursor() as cursor:
            await cursor.execute("SELECT 1")
            ret = await cursor.fetchone()
            assert ret == (1,)


pool = loop.run_until_complete(init_pool())
loop.run_until_complete(execute(pool))
# wait five minutes
loop.run_until_complete(execute(pool))

And output:

DEBUG:asynch.proto.connection:Connecting. Database: production. User: production_tacto_api_user
DEBUG:asynch.proto.connection:Connecting to p4c0wmo0db.germanywestcentral.azure.clickhouse.cloud:9440
DEBUG:asynch.proto.connection:Connected to ClickHouse server version 24.6.1, revision: 54468
DEBUG:asynch.proto.connection:Query: SELECT 1
# wait five minutes
DEBUG:asynch.proto.connection:Socket closed
Traceback (most recent call last):
  File "/Users/nilsborrmann/PycharmProjects/pythonProject/.venv/lib/python3.12/site-packages/asynch/proto/connection.py", line 309, in ping
    packet_type = await self.reader.read_varint()
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nilsborrmann/PycharmProjects/pythonProject/.venv/lib/python3.12/site-packages/asynch/proto/streams/buffered.py", line 139, in read_varint
    if not (await self._is_buffer_readable()):
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nilsborrmann/PycharmProjects/pythonProject/.venv/lib/python3.12/site-packages/asynch/proto/streams/buffered.py", line 117, in _is_buffer_readable
    await self._refill_buffer()
  File "/Users/nilsborrmann/PycharmProjects/pythonProject/.venv/lib/python3.12/site-packages/asynch/proto/streams/buffered.py", line 111, in _refill_buffer
    await self._read_into_buffer()
  File "/Users/nilsborrmann/PycharmProjects/pythonProject/.venv/lib/python3.12/site-packages/asynch/proto/streams/buffered.py", line 127, in _read_into_buffer
    packet = await self.reader.read(self.buffer_max_size)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nilsborrmann/.pyenv/versions/3.12.7/lib/python3.12/asyncio/streams.py", line 713, in read
    await self._wait_for_data('read')
  File "/Users/nilsborrmann/.pyenv/versions/3.12.7/lib/python3.12/asyncio/streams.py", line 545, in _wait_for_data
    await self._waiter
  File "/Users/nilsborrmann/.pyenv/versions/3.12.7/lib/python3.12/asyncio/selector_events.py", line 988, in _read_ready__get_buffer
    nbytes = self._sock.recv_into(buf)
             ^^^^^^^^^^^^^^^^^^^^^^^^^
ConnectionResetError: [Errno 54] Connection reset by peer
WARNING:asynch.proto.connection:Connection was closed, reconnecting.
Traceback (most recent call last):
  File "<input>", line 1, in <module>
  File "/Users/nilsborrmann/.pyenv/versions/3.12.7/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "<input>", line 23, in execute
  File "/Users/nilsborrmann/PycharmProjects/pythonProject/.venv/lib/python3.12/site-packages/asynch/cursors.py", line 88, in execute
    response = await execute(query, args=args, with_column_types=True, **execute_kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nilsborrmann/PycharmProjects/pythonProject/.venv/lib/python3.12/site-packages/asynch/proto/connection.py", line 630, in execute
    and types.
 ^^^^^^^^^^^^^^
  File "/Users/nilsborrmann/PycharmProjects/pythonProject/.venv/lib/python3.12/site-packages/asynch/proto/context.py", line 53, in __aenter__
    await self._connection.disconnect()
  File "/Users/nilsborrmann/PycharmProjects/pythonProject/.venv/lib/python3.12/site-packages/asynch/proto/connection.py", line 565, in disconnect
    try:
    ^^^^^
  File "/Users/nilsborrmann/PycharmProjects/pythonProject/.venv/lib/python3.12/site-packages/asynch/proto/streams/buffered.py", line 64, in close
    await self.writer.wait_closed()
  File "/Users/nilsborrmann/.pyenv/versions/3.12.7/lib/python3.12/asyncio/streams.py", line 364, in wait_closed
    await self._protocol._get_close_waiter(self)
ConnectionResetError: [Errno 54] Connection reset by peer

% pip freeze
asynch==0.2.5
ciso8601==2.3.1
leb128==1.0.8
lz4==4.3.3
pytz==2024.2
tzlocal==5.2
zstd==1.5.5.1
@stankudrow
Copy link
Contributor

stankudrow commented Nov 9, 2024

@nils-borrmann-tacto , hello.

This example does not reproduce the problem you encountered, locally two runs went fine.

Example:

import asyncio
import logging
from time import sleep

from asynch import Pool


logging.basicConfig(
    level=logging.DEBUG, format="Example %(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
)

# connection_string ='clickhouse://@p4c0wmo0db.germanywestcentral.azure.clickhouse.cloud:9440/production?secure=True'


async def init_pool() -> Pool:
    pool = Pool(minsize=1, maxsize=2)  # , dsn=connection_string)
    await pool.startup()
    return pool


async def execute(pool: Pool):
    async with pool.connection() as conn:
        async with conn.cursor() as cursor:
            await cursor.execute("SELECT 1")
            ret = await cursor.fetchone()
            assert ret == (1,)


async def main():
    pool = await init_pool()
    await execute(pool)
    sleep(5 * 60)  # waiting for 5 minutes
    await execute(pool)
    await pool.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

Result (kkk):

asynch_reconn_example

Could you try with example locally and then with the original connection string?

ciso8601==2.3.1
...
clickhouse-cityhash==1.0.2.4
clickhouse-driver==0.2.9
...
leb128==1.0.8
lz4==4.3.3
...
pytz==2024.1
tzlocal==5.2
...
uvloop==0.20.0
zstd==1.5.5.1

@nils-borrmann-tacto
Copy link
Contributor Author

nils-borrmann-tacto commented Nov 10, 2024

Hi @stankudrow thanks for the quick reply.

In your run, it didn't print this line

WARNING:asynch.proto.connection:Connection was closed, reconnecting.

So it didn't run into the crash, because the server didn't close the connection. It was able to re-use the existing connection just fine. The crash only happens when a previously opened connection cannot be reused because it was closed by the server.

I am using clickhouse cloud, and this is likely a behaviour that is not replicated by a local installation of clickhouse. I will see if I can better reproduce the issue locally, maybe by killing the locally running clickhouse instance.

@stankudrow
Copy link
Contributor

Hi @stankudrow thanks for the quick reply.

In your run, it didn't print this line

WARNING:asynch.proto.connection:Connection was closed, reconnecting.

So it didn't run into the crash, because the server didn't close the connection. It was able to re-use the existing connection just fine. The crash only happens when a previously opened connection cannot be reused because it was closed by the server.

I am using clickhouse cloud, and this is likely a behaviour that is not replicated by a local installation of clickhouse. I will see if I can better reproduce the issue locally, maybe by killing the locally running clickhouse instance.

Yep, such an example would be nice to have as a unit test, but I am not sure I can compose one today.

@nils-borrmann-tacto
Copy link
Contributor Author

I managed to reproduce this error locally: #129

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants