From 20511f8429a4e541a1e76de60f6c2eb8de4e7702 Mon Sep 17 00:00:00 2001 From: Adam Liddell Date: Sat, 28 Mar 2020 00:41:22 +0000 Subject: [PATCH 1/4] Ensure pool connection is released in acquire when cancelled Closes #547 When wait_for is cancelled, there is a chance that the waited task has already been completed, leaving the connection looking like it is in use. This fix ensures that the connection is returned to the pool in this situation. For context, see: https://bugs.python.org/issue37658 https://github.com/MagicStack/asyncpg/issues/467 --- asyncpg/pool.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/asyncpg/pool.py b/asyncpg/pool.py index 20a3234e..cd53e308 100644 --- a/asyncpg/pool.py +++ b/asyncpg/pool.py @@ -605,15 +605,29 @@ async def _acquire_impl(): ch._timeout = timeout return proxy + def _release_leaked_connection(fut): + if not fut.cancelled(): + asyncio.ensure_future(self.release(fut.result())) + if self._closing: raise exceptions.InterfaceError('pool is closing') self._check_init() + acquire_fut = asyncio.ensure_future(_acquire_impl()) if timeout is None: - return await _acquire_impl() + return await acquire_fut else: - return await asyncio.wait_for( - _acquire_impl(), timeout=timeout) + try: + return await asyncio.wait_for( + acquire_fut, timeout=timeout) + except asyncio.CancelledError: + # Ensure connection is marked as not in use. + # The cancellation may have raced the acquire, leading + # to the acquire completing but the wait_for to be + # cancelled. + # See: https://bugs.python.org/issue37658 + acquire_fut.add_done_callback(_release_leaked_connection) + raise async def release(self, connection, *, timeout=None): """Release a database connection back to the pool. From 2646c8136379ef02a6c4ea36cb0416a79a869914 Mon Sep 17 00:00:00 2001 From: Adam Liddell Date: Sat, 28 Mar 2020 00:47:28 +0000 Subject: [PATCH 2/4] Update other use of wait_for workaround Added issue link and comment describing why this workaround is necessary. Also moved the function to be local to the call site, as it it not used elsewhere. --- asyncpg/connect_utils.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/asyncpg/connect_utils.py b/asyncpg/connect_utils.py index ec3d1090..efa1edcf 100644 --- a/asyncpg/connect_utils.py +++ b/asyncpg/connect_utils.py @@ -615,12 +615,23 @@ async def _connect_addr(*, addr, loop, timeout, params, config, else: connector = loop.create_connection(proto_factory, *addr) + def _close_leaked_connection(fut): + if not fut.cancelled(): + tr, pr = fut.result() + if tr: + tr.close() + connector = asyncio.ensure_future(connector) before = time.monotonic() try: tr, pr = await asyncio.wait_for( connector, timeout=timeout) except asyncio.CancelledError: + # Ensure connection is closed. + # The cancellation may have raced the connect, leading + # to the connect completing but the wait_for to be + # cancelled. + # See: https://bugs.python.org/issue37658 connector.add_done_callback(_close_leaked_connection) raise timeout -= time.monotonic() - before @@ -721,12 +732,3 @@ def _create_future(loop): return asyncio.Future(loop=loop) else: return create_future() - - -def _close_leaked_connection(fut): - try: - tr, pr = fut.result() - if tr: - tr.close() - except asyncio.CancelledError: - pass # hide the exception From 6f22ad8e33e4dc0d4bd07f495ab98dea0eed4e07 Mon Sep 17 00:00:00 2001 From: Adam Liddell Date: Sat, 28 Mar 2020 13:12:53 +0000 Subject: [PATCH 3/4] Add test for acquire connection leak --- tests/test_pool.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/test_pool.py b/tests/test_pool.py index e51923e4..9857dceb 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -379,6 +379,26 @@ async def worker(): self.cluster.trust_local_connections() self.cluster.reload() + async def test_pool_handles_task_cancel_in_acquire_with_timeout(self): + # See https://github.com/MagicStack/asyncpg/issues/547 + pool = await self.create_pool(database='postgres', + min_size=1, max_size=1) + + async def worker(): + async with pool.acquire(timeout=100): + pass + + # Schedule task + task = self.loop.create_task(worker()) + # Yield to task, but cancel almost immediately + await asyncio.sleep(0.00000000001) + # Cancel the worker. + task.cancel() + # Wait to make sure the cleanup has completed. + await asyncio.sleep(0.4) + # Check that the connection has been returned to the pool. + self.assertEqual(pool._queue.qsize(), 1) + async def test_pool_handles_task_cancel_in_release(self): # Use SlowResetConnectionPool to simulate # the Task.cancel() and __aexit__ race. From 51b8d7c1a15a1689d74d3f99745db3ab3131b472 Mon Sep 17 00:00:00 2001 From: Adam Liddell Date: Sat, 2 May 2020 22:19:41 +0100 Subject: [PATCH 4/4] Use asyncio.wait_for in pool acquire regardless of timeout When wait_for is called with timeout=None, it runs without a timeout, as desired --- asyncpg/pool.py | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/asyncpg/pool.py b/asyncpg/pool.py index cd53e308..ed9ccff8 100644 --- a/asyncpg/pool.py +++ b/asyncpg/pool.py @@ -614,20 +614,19 @@ def _release_leaked_connection(fut): self._check_init() acquire_fut = asyncio.ensure_future(_acquire_impl()) - if timeout is None: - return await acquire_fut - else: - try: - return await asyncio.wait_for( - acquire_fut, timeout=timeout) - except asyncio.CancelledError: - # Ensure connection is marked as not in use. - # The cancellation may have raced the acquire, leading - # to the acquire completing but the wait_for to be - # cancelled. - # See: https://bugs.python.org/issue37658 - acquire_fut.add_done_callback(_release_leaked_connection) - raise + try: + # Calling wait_for with timeout=None will shortcut to run without + # timeout + return await asyncio.wait_for( + acquire_fut, timeout=timeout) + except asyncio.CancelledError: + # Ensure connection is marked as not in use. + # The cancellation may have raced the acquire, leading + # to the acquire completing but the wait_for to be + # cancelled. + # See: https://bugs.python.org/issue37658 + acquire_fut.add_done_callback(_release_leaked_connection) + raise async def release(self, connection, *, timeout=None): """Release a database connection back to the pool.