Skip to content

Ensure pool connection is released in acquire when cancelled #548

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions asyncpg/connect_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,12 +626,23 @@ async def _connect_addr(*, addr, loop, timeout, params, config,
else:
connector = loop.create_connection(proto_factory, *addr)

def _close_leaked_connection(fut):
if not fut.cancelled():
tr, pr = fut.result()
if tr:
tr.close()

connector = asyncio.ensure_future(connector)
before = time.monotonic()
try:
tr, pr = await asyncio.wait_for(
connector, timeout=timeout)
except asyncio.CancelledError:
# Ensure connection is closed.
# The cancellation may have raced the connect, leading
# to the connect completing but the wait_for to be
# cancelled.
# See: https://bugs.python.org/issue37658
connector.add_done_callback(_close_leaked_connection)
raise
timeout -= time.monotonic() - before
Expand Down Expand Up @@ -732,12 +743,3 @@ def _create_future(loop):
return asyncio.Future(loop=loop)
else:
return create_future()


def _close_leaked_connection(fut):
try:
tr, pr = fut.result()
if tr:
tr.close()
except asyncio.CancelledError:
pass # hide the exception
21 changes: 17 additions & 4 deletions asyncpg/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,15 +605,28 @@ async def _acquire_impl():
ch._timeout = timeout
return proxy

def _release_leaked_connection(fut):
if not fut.cancelled():
asyncio.ensure_future(self.release(fut.result()))

if self._closing:
raise exceptions.InterfaceError('pool is closing')
self._check_init()

if timeout is None:
return await _acquire_impl()
else:
acquire_fut = asyncio.ensure_future(_acquire_impl())
try:
# Calling wait_for with timeout=None will shortcut to run without
# timeout
return await asyncio.wait_for(
_acquire_impl(), timeout=timeout)
acquire_fut, timeout=timeout)
except asyncio.CancelledError:
# Ensure connection is marked as not in use.
# The cancellation may have raced the acquire, leading
# to the acquire completing but the wait_for to be
# cancelled.
# See: https://bugs.python.org/issue37658
acquire_fut.add_done_callback(_release_leaked_connection)
raise

async def release(self, connection, *, timeout=None):
"""Release a database connection back to the pool.
Expand Down
20 changes: 20 additions & 0 deletions tests/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,26 @@ async def worker():
self.cluster.trust_local_connections()
self.cluster.reload()

async def test_pool_handles_task_cancel_in_acquire_with_timeout(self):
# See https://github.com/MagicStack/asyncpg/issues/547
pool = await self.create_pool(database='postgres',
min_size=1, max_size=1)

async def worker():
async with pool.acquire(timeout=100):
pass

# Schedule task
task = self.loop.create_task(worker())
# Yield to task, but cancel almost immediately
await asyncio.sleep(0.00000000001)
# Cancel the worker.
task.cancel()
# Wait to make sure the cleanup has completed.
await asyncio.sleep(0.4)
# Check that the connection has been returned to the pool.
self.assertEqual(pool._queue.qsize(), 1)

async def test_pool_handles_task_cancel_in_release(self):
# Use SlowResetConnectionPool to simulate
# the Task.cancel() and __aexit__ race.
Expand Down