Skip to content

Commit

Permalink
deprecate and remove blocking connection.cancel() method
Browse files Browse the repository at this point in the history
  • Loading branch information
brianmaissy committed Dec 13, 2020
1 parent b5d3490 commit a485878
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 128 deletions.
50 changes: 6 additions & 44 deletions aiopg/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ def __init__(
self._timeout = timeout
self._last_usage = self._loop.time()
self._writing = False
self._cancelling = False
self._cancellation_waiter = None
self._echo = echo
self._cursor_instance = None
self._notifies = asyncio.Queue()
Expand Down Expand Up @@ -173,39 +171,19 @@ def _fatal_error(self, message):

def _create_waiter(self, func_name):
if self._waiter is not None:
if self._cancelling:
if not self._waiter.done():
raise RuntimeError('%s() called while connection is '
'being cancelled' % func_name)
else:
raise RuntimeError('%s() called while another coroutine is '
'already waiting for incoming '
'data' % func_name)
raise RuntimeError('%s() called while another coroutine is '
'already waiting for incoming data' % func_name)
self._waiter = self._loop.create_future()
return self._waiter

async def _poll(self, waiter, timeout):
assert waiter is self._waiter, (waiter, self._waiter)
self._ready(self._weakref)

async def cancel():
self._waiter = self._loop.create_future()
self._cancelling = True
self._cancellation_waiter = self._waiter
self._conn.cancel()
if not self._conn.isexecuting():
return
try:
await asyncio.wait_for(self._waiter, timeout)
except psycopg2.extensions.QueryCanceledError:
pass
except asyncio.TimeoutError:
self._close()

try:
await asyncio.wait_for(self._waiter, timeout)
except (asyncio.CancelledError, asyncio.TimeoutError) as exc:
await asyncio.shield(cancel())
await asyncio.shield(self.close())
raise exc
except psycopg2.extensions.QueryCanceledError as exc:
self._loop.call_exception_handler({
Expand All @@ -215,13 +193,7 @@ async def cancel():
})
raise asyncio.CancelledError
finally:
if self._cancelling:
self._cancelling = False
if self._waiter is self._cancellation_waiter:
self._waiter = None
self._cancellation_waiter = None
else:
self._waiter = None
self._waiter = None

def _isexecuting(self):
return self._conn.isexecuting()
Expand Down Expand Up @@ -360,18 +332,8 @@ async def tpc_recover(self):
"tpc_recover cannot be used in asynchronous mode")

async def cancel(self):
"""Cancel the current database operation."""
if self._waiter is None:
return

async def cancel():
self._conn.cancel()
try:
await self._waiter
except psycopg2.extensions.QueryCanceledError:
pass

await asyncio.shield(cancel())
raise psycopg2.ProgrammingError(
"cancel cannot be used in asynchronous mode")

async def reset(self):
raise psycopg2.ProgrammingError(
Expand Down
20 changes: 4 additions & 16 deletions docs/core.rst
Original file line number Diff line number Diff line change
Expand Up @@ -148,24 +148,12 @@ Example::
The readonly property that underlying
:class:`psycopg2.connection` instance.

.. comethod:: cancel(timeout=None)
.. comethod:: cancel()

Cancel current database operation.
.. versionchanged:: 1.2.0

The method interrupts the processing of the current
operation. If no query is being executed, it does nothing. You
can call this function from a different thread than the one
currently executing a database operation, for instance if you
want to cancel a long running query if a button is pushed in the
UI. Interrupting query execution will cause the cancelled method
to raise a :exc:`psycopg2.extensions.QueryCanceledError`. Note that
the termination of the query is not guaranteed to succeed: see
the documentation for |PQcancel|_.

:param float timeout: timeout for cancelling.

.. |PQcancel| replace:: ``PQcancel()``
.. _PQcancel: http://www.postgresql.org/docs/current/static/libpq-cancel.html#LIBPQ-PQCANCEL
Not supported in asynchronous mode (:exc:`psycopg2.ProgrammingError`
is raised).

.. attribute:: dsn

Expand Down
3 changes: 1 addition & 2 deletions tests/test_async_await.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,10 @@ async def test_pool_context_manager_timeout(pg_params, loop):
with cursor_ctx as cursor:
hung_task = cursor.execute('SELECT pg_sleep(10000);')
# start task
fut = loop.create_task(hung_task)
loop.create_task(hung_task)
# sleep for a bit so it gets going
await asyncio.sleep(1)

fut.cancel()
cursor_ctx = await pool.cursor()
with cursor_ctx as cursor:
resp = await cursor.execute('SELECT 42;')
Expand Down
69 changes: 5 additions & 64 deletions tests/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import aiopg
from aiopg.connection import TIMEOUT, Connection
from aiopg.cursor import Cursor
from aiopg.utils import ensure_future

PY_341 = sys.version_info >= (3, 4, 1)

Expand Down Expand Up @@ -253,63 +252,10 @@ async def test_server_version(connect):
assert 0 < conn.server_version


async def test_cancel_noop(connect):
async def test_cancel_not_supported(connect):
conn = await connect()
await conn.cancel()


async def test_cancel_pending_op(connect, loop):
def exception_handler(loop_, context):
assert context['message'] == context['exception'].pgerror
assert context['future'].exception() is context['exception']
assert loop_ is loop

loop.set_exception_handler(exception_handler)
fut = asyncio.Future(loop=loop)

async def inner():
fut.set_result(None)
await cur.execute("SELECT pg_sleep(10)")

conn = await connect()
cur = await conn.cursor()
task = ensure_future(inner(), loop=loop)
await fut
await asyncio.sleep(0.1, loop=loop)
await conn.cancel()

with pytest.raises(asyncio.CancelledError):
await task


async def test_cancelled_connection_is_not_usable_until_cancellation(connect,
loop):
async def inner(future, cursor):
future.set_result(None)
await cursor.execute("SELECT pg_sleep(10)")

fut = asyncio.Future(loop=loop)
conn = await connect()
cur = await conn.cursor()

task = ensure_future(inner(fut, cur), loop=loop)
await fut
await asyncio.sleep(0.1, loop=loop)

task.cancel()

for i in range(100):
await asyncio.sleep(0)
if conn._cancelling:
break
else:
assert False, "Connection did not start cancelling"

# cur = await conn.cursor()
with pytest.raises(RuntimeError) as e:
await cur.execute('SELECT 1')
assert str(e.value) == ('cursor.execute() called while connection '
'is being cancelled')
with pytest.raises(psycopg2.ProgrammingError):
await conn.cancel()


async def test_close2(connect, loop):
Expand Down Expand Up @@ -442,8 +388,6 @@ async def test_execute_twice(connect):
with pytest.raises(RuntimeError):
next(coro2.__await__())

await conn.cancel()


async def test_connect_to_unsupported_port(unused_port, loop, pg_params):
port = unused_port()
Expand Down Expand Up @@ -550,16 +494,13 @@ async def test_notifies(connect):
cur1.close()


async def test_close_cursor_on_timeout_error(connect):
async def test_close_connection_on_timeout_error(connect):
conn = await connect()
cur = await conn.cursor(timeout=0.01)
with pytest.raises(asyncio.TimeoutError):
await cur.execute("SELECT pg_sleep(10)")

assert cur.closed
assert not conn.closed

conn.close()
assert conn.closed


async def test_issue_111_crash_on_connect_error():
Expand Down
4 changes: 2 additions & 2 deletions tests/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ async def test_unlimited_size(create_pool):
assert pool._free.maxlen is None


async def test_connection_in_good_state_after_timeout(create_pool):
async def test_connection_closed_after_timeout(create_pool):
async def sleep(conn):
cur = await conn.cursor()
await cur.execute('SELECT pg_sleep(10);')
Expand All @@ -457,7 +457,7 @@ async def sleep(conn):
with pytest.raises(asyncio.TimeoutError):
await sleep(conn)

assert 1 == pool.freesize
assert 0 == pool.freesize

with (await pool) as conn:
cur = await conn.cursor()
Expand Down

0 comments on commit a485878

Please sign in to comment.