Skip to content

Commit 2bac166

Browse files
elpransaaliddell
andcommitted
Add a workaround for bpo-37658
`asyncio.wait_for()` currently has a bug where it raises a `CancelledError` even when the wrapped awaitable has completed. The upstream fix is in python/cpython#21894. This adds a workaround until the aforementioned PR is merged, backported and released. Co-authored-by: Adam Liddell <git@aliddell.com> Fixes: #467 Fixes: #547 Related: #468 Supersedes: #548
1 parent c05d726 commit 2bac166

File tree

4 files changed

+41
-18
lines changed

4 files changed

+41
-18
lines changed

asyncpg/compat.py

+16
Original file line numberDiff line numberDiff line change
@@ -90,3 +90,19 @@ async def wait_closed(stream):
9090
# On Windows wait_closed() sometimes propagates
9191
# ConnectionResetError which is totally unnecessary.
9292
pass
93+
94+
95+
# Workaround for https://bugs.python.org/issue37658
96+
async def wait_for(fut, timeout):
97+
if timeout is None:
98+
return await fut
99+
100+
fut = asyncio.ensure_future(fut)
101+
102+
try:
103+
return await asyncio.wait_for(fut, timeout)
104+
except asyncio.CancelledError:
105+
if fut.done():
106+
return fut.result()
107+
else:
108+
raise

asyncpg/connect_utils.py

+2-16
Original file line numberDiff line numberDiff line change
@@ -636,18 +636,13 @@ async def _connect_addr(
636636

637637
connector = asyncio.ensure_future(connector)
638638
before = time.monotonic()
639-
try:
640-
tr, pr = await asyncio.wait_for(
641-
connector, timeout=timeout)
642-
except asyncio.CancelledError:
643-
connector.add_done_callback(_close_leaked_connection)
644-
raise
639+
tr, pr = await compat.wait_for(connector, timeout=timeout)
645640
timeout -= time.monotonic() - before
646641

647642
try:
648643
if timeout <= 0:
649644
raise asyncio.TimeoutError
650-
await asyncio.wait_for(connected, timeout=timeout)
645+
await compat.wait_for(connected, timeout=timeout)
651646
except (Exception, asyncio.CancelledError):
652647
tr.close()
653648
raise
@@ -745,12 +740,3 @@ def _create_future(loop):
745740
return asyncio.Future(loop=loop)
746741
else:
747742
return create_future()
748-
749-
750-
def _close_leaked_connection(fut):
751-
try:
752-
tr, pr = fut.result()
753-
if tr:
754-
tr.close()
755-
except asyncio.CancelledError:
756-
pass # hide the exception

asyncpg/pool.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import time
1313
import warnings
1414

15+
from . import compat
1516
from . import connection
1617
from . import connect_utils
1718
from . import exceptions
@@ -198,7 +199,7 @@ async def release(self, timeout):
198199
# If the connection is in cancellation state,
199200
# wait for the cancellation
200201
started = time.monotonic()
201-
await asyncio.wait_for(
202+
await compat.wait_for(
202203
self._con._protocol._wait_for_cancellation(),
203204
budget)
204205
if budget is not None:
@@ -623,7 +624,7 @@ async def _acquire_impl():
623624
if timeout is None:
624625
return await _acquire_impl()
625626
else:
626-
return await asyncio.wait_for(
627+
return await compat.wait_for(
627628
_acquire_impl(), timeout=timeout)
628629

629630
async def release(self, connection, *, timeout=None):

tests/test_pool.py

+20
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,26 @@ async def worker():
379379
self.cluster.trust_local_connections()
380380
self.cluster.reload()
381381

382+
async def test_pool_handles_task_cancel_in_acquire_with_timeout(self):
383+
# See https://github.com/MagicStack/asyncpg/issues/547
384+
pool = await self.create_pool(database='postgres',
385+
min_size=1, max_size=1)
386+
387+
async def worker():
388+
async with pool.acquire(timeout=100):
389+
pass
390+
391+
# Schedule task
392+
task = self.loop.create_task(worker())
393+
# Yield to task, but cancel almost immediately
394+
await asyncio.sleep(0.00000000001)
395+
# Cancel the worker.
396+
task.cancel()
397+
# Wait to make sure the cleanup has completed.
398+
await asyncio.sleep(0.4)
399+
# Check that the connection has been returned to the pool.
400+
self.assertEqual(pool._queue.qsize(), 1)
401+
382402
async def test_pool_handles_task_cancel_in_release(self):
383403
# Use SlowResetConnectionPool to simulate
384404
# the Task.cancel() and __aexit__ race.

0 commit comments

Comments
 (0)