diff --git a/asyncpg/connect_utils.py b/asyncpg/connect_utils.py index 2678b358..c2f0901b 100644 --- a/asyncpg/connect_utils.py +++ b/asyncpg/connect_utils.py @@ -626,12 +626,23 @@ async def _connect_addr(*, addr, loop, timeout, params, config, else: connector = loop.create_connection(proto_factory, *addr) + def _close_leaked_connection(fut): + if not fut.cancelled(): + tr, pr = fut.result() + if tr: + tr.close() + connector = asyncio.ensure_future(connector) before = time.monotonic() try: tr, pr = await asyncio.wait_for( connector, timeout=timeout) except asyncio.CancelledError: + # Ensure connection is closed. + # The cancellation may have raced the connect, leading + # to the connect completing but the wait_for to be + # cancelled. + # See: https://bugs.python.org/issue37658 connector.add_done_callback(_close_leaked_connection) raise timeout -= time.monotonic() - before @@ -732,12 +743,3 @@ def _create_future(loop): return asyncio.Future(loop=loop) else: return create_future() - - -def _close_leaked_connection(fut): - try: - tr, pr = fut.result() - if tr: - tr.close() - except asyncio.CancelledError: - pass # hide the exception diff --git a/asyncpg/pool.py b/asyncpg/pool.py index 20a3234e..ed9ccff8 100644 --- a/asyncpg/pool.py +++ b/asyncpg/pool.py @@ -605,15 +605,28 @@ async def _acquire_impl(): ch._timeout = timeout return proxy + def _release_leaked_connection(fut): + if not fut.cancelled(): + asyncio.ensure_future(self.release(fut.result())) + if self._closing: raise exceptions.InterfaceError('pool is closing') self._check_init() - if timeout is None: - return await _acquire_impl() - else: + acquire_fut = asyncio.ensure_future(_acquire_impl()) + try: + # Calling wait_for with timeout=None will shortcut to run without + # timeout return await asyncio.wait_for( - _acquire_impl(), timeout=timeout) + acquire_fut, timeout=timeout) + except asyncio.CancelledError: + # Ensure connection is marked as not in use. + # The cancellation may have raced the acquire, leading + # to the acquire completing but the wait_for to be + # cancelled. + # See: https://bugs.python.org/issue37658 + acquire_fut.add_done_callback(_release_leaked_connection) + raise async def release(self, connection, *, timeout=None): """Release a database connection back to the pool. diff --git a/tests/test_pool.py b/tests/test_pool.py index e51923e4..9857dceb 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -379,6 +379,26 @@ async def worker(): self.cluster.trust_local_connections() self.cluster.reload() + async def test_pool_handles_task_cancel_in_acquire_with_timeout(self): + # See https://github.com/MagicStack/asyncpg/issues/547 + pool = await self.create_pool(database='postgres', + min_size=1, max_size=1) + + async def worker(): + async with pool.acquire(timeout=100): + pass + + # Schedule task + task = self.loop.create_task(worker()) + # Yield to task, but cancel almost immediately + await asyncio.sleep(0.00000000001) + # Cancel the worker. + task.cancel() + # Wait to make sure the cleanup has completed. + await asyncio.sleep(0.4) + # Check that the connection has been returned to the pool. + self.assertEqual(pool._queue.qsize(), 1) + async def test_pool_handles_task_cancel_in_release(self): # Use SlowResetConnectionPool to simulate # the Task.cancel() and __aexit__ race.