Skip to content

Commit baf5ce7

Browse files
committed
Invalidate statement cache on schema changes affecting statement result.
PostgreSQL will raise an exception when it detects that the result type of the query has changed from when the statement was prepared. This may happen, for example, after an ALTER TABLE or SET search_path. When this happens, and there is no transaction running, we can simply re-prepare the statement and try again. If the transaction _is_ running, this error will put it into an error state, and we have no choice but to raise an exception. The original error is somewhat cryptic, so we raise a custom InvalidCachedStatementError with the original server exception as context. In either case we clear the statement cache for this connection and all other connections of the pool this connection belongs to (if any). See #72 and #76 for discussion. Fixes: #72.
1 parent 0dd8fb6 commit baf5ce7

File tree

9 files changed

+274
-60
lines changed

9 files changed

+274
-60
lines changed

asyncpg/connection.py

+58-13
Original file line numberDiff line numberDiff line change
@@ -187,9 +187,7 @@ async def execute(self, query: str, *args, timeout: float=None) -> str:
187187
if not args:
188188
return await self._protocol.query(query, timeout)
189189

190-
stmt = await self._get_statement(query, timeout)
191-
_, status, _ = await self._protocol.bind_execute(stmt, args, '', 0,
192-
True, timeout)
190+
_, status, _ = await self._do_execute(query, args, 0, timeout, True)
193191
return status.decode()
194192

195193
async def executemany(self, command: str, args, timeout: float=None):
@@ -283,10 +281,7 @@ async def fetch(self, query, *args, timeout=None) -> list:
283281
284282
:return list: A list of :class:`Record` instances.
285283
"""
286-
stmt = await self._get_statement(query, timeout)
287-
data = await self._protocol.bind_execute(stmt, args, '', 0,
288-
False, timeout)
289-
return data
284+
return await self._do_execute(query, args, 0, timeout)
290285

291286
async def fetchval(self, query, *args, column=0, timeout=None):
292287
"""Run a query and return a value in the first row.
@@ -302,9 +297,7 @@ async def fetchval(self, query, *args, column=0, timeout=None):
302297
303298
:return: The value of the specified column of the first record.
304299
"""
305-
stmt = await self._get_statement(query, timeout)
306-
data = await self._protocol.bind_execute(stmt, args, '', 1,
307-
False, timeout)
300+
data = await self._do_execute(query, args, 1, timeout)
308301
if not data:
309302
return None
310303
return data[0][column]
@@ -318,9 +311,7 @@ async def fetchrow(self, query, *args, timeout=None):
318311
319312
:return: The first row as a :class:`Record` instance.
320313
"""
321-
stmt = await self._get_statement(query, timeout)
322-
data = await self._protocol.bind_execute(stmt, args, '', 1,
323-
False, timeout)
314+
data = await self._do_execute(query, args, 1, timeout)
324315
if not data:
325316
return None
326317
return data[0]
@@ -551,6 +542,60 @@ def _set_proxy(self, proxy):
551542

552543
self._proxy = proxy
553544

545+
def _drop_local_statement_cache(self):
546+
self._stmt_cache.clear()
547+
548+
def _drop_global_statement_cache(self):
549+
if self._proxy is not None:
550+
# This connection is a member of a pool, so we delegate
551+
# the cache drop to the pool.
552+
pool = self._proxy._holder._pool
553+
pool._drop_statement_cache()
554+
else:
555+
self._drop_local_statement_cache()
556+
557+
async def _do_execute(self, query, args, limit, timeout,
558+
return_status=False):
559+
stmt = await self._get_statement(query, timeout)
560+
561+
try:
562+
result = await self._protocol.bind_execute(
563+
stmt, args, '', limit, return_status, timeout)
564+
565+
except exceptions.InvalidCachedStatementError as e:
566+
# PostgreSQL will raise an exception when it detects
567+
# that the result type of the query has changed from
568+
# when the statement was prepared. This may happen,
569+
# for example, after an ALTER TABLE or SET search_path.
570+
#
571+
# When this happens, and there is no transaction running,
572+
# we can simply re-prepare the statement and try once
573+
# again. We deliberately retry only once as this is
574+
# supposed to be a rare occurrence.
575+
#
576+
# If the transaction _is_ running, this error will put it
577+
# into an error state, and we have no choice but to
578+
# re-raise the exception.
579+
#
580+
# In either case we clear the statement cache for this
581+
# connection and all other connections of the pool this
582+
# connection belongs to (if any).
583+
#
584+
# See https://github.com/MagicStack/asyncpg/issues/72
585+
# and https://github.com/MagicStack/asyncpg/issues/76
586+
# for discussion.
587+
#
588+
self._drop_global_statement_cache()
589+
590+
if self._protocol.is_in_transaction():
591+
raise
592+
else:
593+
stmt = await self._get_statement(query, timeout)
594+
result = await self._protocol.bind_execute(
595+
stmt, args, '', limit, return_status, timeout)
596+
597+
return result
598+
554599

555600
async def connect(dsn=None, *,
556601
host=None, port=None,

asyncpg/exceptions/__init__.py

+14-16
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,3 @@
1-
# Copyright (C) 2016-present the ayncpg authors and contributors
2-
# <see AUTHORS file>
3-
#
4-
# This module is part of asyncpg and is released under
5-
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
6-
7-
81
# GENERATED FROM postgresql/src/backend/utils/errcodes.txt
92
# DO NOT MODIFY, use tools/generate_exceptions.py to update
103

@@ -92,6 +85,10 @@ class FeatureNotSupportedError(_base.PostgresError):
9285
sqlstate = '0A000'
9386

9487

88+
class InvalidCachedStatementError(FeatureNotSupportedError):
89+
pass
90+
91+
9592
class InvalidTransactionInitiationError(_base.PostgresError):
9693
sqlstate = '0B000'
9794

@@ -1025,15 +1022,16 @@ class IndexCorruptedError(InternalServerError):
10251022
'InvalidArgumentForPowerFunctionError',
10261023
'InvalidArgumentForWidthBucketFunctionError',
10271024
'InvalidAuthorizationSpecificationError',
1028-
'InvalidBinaryRepresentationError', 'InvalidCatalogNameError',
1029-
'InvalidCharacterValueForCastError', 'InvalidColumnDefinitionError',
1030-
'InvalidColumnReferenceError', 'InvalidCursorDefinitionError',
1031-
'InvalidCursorNameError', 'InvalidCursorStateError',
1032-
'InvalidDatabaseDefinitionError', 'InvalidDatetimeFormatError',
1033-
'InvalidEscapeCharacterError', 'InvalidEscapeOctetError',
1034-
'InvalidEscapeSequenceError', 'InvalidForeignKeyError',
1035-
'InvalidFunctionDefinitionError', 'InvalidGrantOperationError',
1036-
'InvalidGrantorError', 'InvalidIndicatorParameterValueError',
1025+
'InvalidBinaryRepresentationError', 'InvalidCachedStatementError',
1026+
'InvalidCatalogNameError', 'InvalidCharacterValueForCastError',
1027+
'InvalidColumnDefinitionError', 'InvalidColumnReferenceError',
1028+
'InvalidCursorDefinitionError', 'InvalidCursorNameError',
1029+
'InvalidCursorStateError', 'InvalidDatabaseDefinitionError',
1030+
'InvalidDatetimeFormatError', 'InvalidEscapeCharacterError',
1031+
'InvalidEscapeOctetError', 'InvalidEscapeSequenceError',
1032+
'InvalidForeignKeyError', 'InvalidFunctionDefinitionError',
1033+
'InvalidGrantOperationError', 'InvalidGrantorError',
1034+
'InvalidIndicatorParameterValueError',
10371035
'InvalidLocatorSpecificationError', 'InvalidNameError',
10381036
'InvalidObjectDefinitionError', 'InvalidParameterValueError',
10391037
'InvalidPasswordError', 'InvalidPreparedStatementDefinitionError',

asyncpg/exceptions/_base.py

+41-7
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@
1212
'InterfaceError')
1313

1414

15+
def _is_asyncpg_class(cls):
16+
modname = cls.__module__
17+
return modname == 'asyncpg' or modname.startswith('asyncpg.')
18+
19+
1520
class PostgresMessageMeta(type):
1621
_message_map = {}
1722
_field_map = {
@@ -40,8 +45,7 @@ def __new__(mcls, name, bases, dct):
4045
for f in mcls._field_map.values():
4146
setattr(cls, f, None)
4247

43-
if (cls.__module__ == 'asyncpg' or
44-
cls.__module__.startswith('asyncpg.')):
48+
if _is_asyncpg_class(cls):
4549
mod = sys.modules[cls.__module__]
4650
if hasattr(mod, name):
4751
raise RuntimeError('exception class redefinition: {}'.format(
@@ -74,21 +78,51 @@ def __str__(self):
7478
return msg
7579

7680
@classmethod
77-
def new(cls, fields, query=None):
81+
def _get_error_template(cls, fields, query):
7882
errcode = fields.get('C')
7983
mcls = cls.__class__
8084
exccls = mcls.get_message_class_for_sqlstate(errcode)
81-
mapped = {
85+
dct = {
8286
'query': query
8387
}
8488

8589
for k, v in fields.items():
8690
field = mcls._field_map.get(k)
8791
if field:
88-
mapped[field] = v
92+
dct[field] = v
8993

90-
e = exccls(mapped.get('message', ''))
91-
e.__dict__.update(mapped)
94+
return exccls, dct
95+
96+
@classmethod
97+
def new(cls, fields, query=None):
98+
exccls, dct = cls._get_error_template(fields, query)
99+
100+
message = dct.get('message', '')
101+
102+
# PostgreSQL will raise an exception when it detects
103+
# that the result type of the query has changed from
104+
# when the statement was prepared.
105+
#
106+
# The original error is somewhat cryptic and unspecific,
107+
# so we raise a custom subclass that is easier to handle
108+
# and identify.
109+
#
110+
# Note that we specifically do not rely on the error
111+
# message, as it is localizable.
112+
is_icse = (
113+
exccls.__name__ == 'FeatureNotSupportedError' and
114+
_is_asyncpg_class(exccls) and
115+
dct.get('server_source_function') == 'RevalidateCachedQuery'
116+
)
117+
118+
if is_icse:
119+
exceptions = sys.modules[exccls.__module__]
120+
exccls = exceptions.InvalidCachedStatementError
121+
message = ('cached statement plan is invalid due to a database '
122+
'schema or configuration change')
123+
124+
e = exccls(message)
125+
e.__dict__.update(dct)
92126

93127
return e
94128

asyncpg/pool.py

+6
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,12 @@ def _check_init(self):
402402
if self._closed:
403403
raise exceptions.InterfaceError('pool is closed')
404404

405+
def _drop_statement_cache(self):
406+
# Drop statement cache for all connections in the pool.
407+
for ch in self._holders:
408+
if ch._con is not None:
409+
ch._con._drop_local_statement_cache()
410+
405411
def __await__(self):
406412
return self._async__init__().__await__()
407413

asyncpg/prepared_stmt.py

+10-14
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,7 @@ async def fetch(self, *args, timeout=None):
154154
155155
:return: A list of :class:`Record` instances.
156156
"""
157-
self.__check_open()
158-
protocol = self._connection._protocol
159-
data, status, _ = await protocol.bind_execute(
160-
self._state, args, '', 0, True, timeout)
161-
self._last_status = status
157+
data = await self.__bind_execute(args, 0, timeout)
162158
return data
163159

164160
async def fetchval(self, *args, column=0, timeout=None):
@@ -174,11 +170,7 @@ async def fetchval(self, *args, column=0, timeout=None):
174170
175171
:return: The value of the specified column of the first record.
176172
"""
177-
self.__check_open()
178-
protocol = self._connection._protocol
179-
data, status, _ = await protocol.bind_execute(
180-
self._state, args, '', 1, True, timeout)
181-
self._last_status = status
173+
data = await self.__bind_execute(args, 1, timeout)
182174
if not data:
183175
return None
184176
return data[0][column]
@@ -192,14 +184,18 @@ async def fetchrow(self, *args, timeout=None):
192184
193185
:return: The first row as a :class:`Record` instance.
194186
"""
187+
data = await self.__bind_execute(args, 1, timeout)
188+
if not data:
189+
return None
190+
return data[0]
191+
192+
async def __bind_execute(self, args, limit, timeout):
195193
self.__check_open()
196194
protocol = self._connection._protocol
197195
data, status, _ = await protocol.bind_execute(
198-
self._state, args, '', 1, True, timeout)
196+
self._state, args, '', limit, True, timeout)
199197
self._last_status = status
200-
if not data:
201-
return None
202-
return data[0]
198+
return data
203199

204200
def __check_open(self):
205201
if self._state.closed:

asyncpg/protocol/protocol.pyx

+3-1
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,9 @@ cdef class BaseProtocol(CoreProtocol):
121121
return self.settings
122122

123123
def is_in_transaction(self):
124-
return self.xact_status == PQTRANS_INTRANS
124+
# PQTRANS_INTRANS = idle, within transaction block
125+
# PQTRANS_INERROR = idle, within failed transaction
126+
return self.xact_status in (PQTRANS_INTRANS, PQTRANS_INERROR)
125127

126128
async def prepare(self, stmt_name, query, timeout):
127129
if self.cancel_waiter is not None:

tests/test_cache_invalidation.py

+85
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
# Copyright (C) 2016-present the ayncpg authors and contributors
2+
# <see AUTHORS file>
3+
#
4+
# This module is part of asyncpg and is released under
5+
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
6+
7+
8+
import asyncpg
9+
from asyncpg import _testbase as tb
10+
11+
12+
class TestCacheInvalidation(tb.ConnectedTestCase):
13+
async def test_prepare_cache_invalidation_silent(self):
14+
await self.con.execute('CREATE TABLE tab1(a int, b int)')
15+
16+
try:
17+
await self.con.execute('INSERT INTO tab1 VALUES (1, 2)')
18+
result = await self.con.fetchrow('SELECT * FROM tab1')
19+
self.assertEqual(result, (1, 2))
20+
21+
await self.con.execute(
22+
'ALTER TABLE tab1 ALTER COLUMN b SET DATA TYPE text')
23+
24+
result = await self.con.fetchrow('SELECT * FROM tab1')
25+
self.assertEqual(result, (1, '2'))
26+
finally:
27+
await self.con.execute('DROP TABLE tab1')
28+
29+
async def test_prepare_cache_invalidation_in_transaction(self):
30+
await self.con.execute('CREATE TABLE tab1(a int, b int)')
31+
32+
try:
33+
await self.con.execute('INSERT INTO tab1 VALUES (1, 2)')
34+
result = await self.con.fetchrow('SELECT * FROM tab1')
35+
self.assertEqual(result, (1, 2))
36+
37+
await self.con.execute(
38+
'ALTER TABLE tab1 ALTER COLUMN b SET DATA TYPE text')
39+
40+
with self.assertRaisesRegex(asyncpg.InvalidCachedStatementError,
41+
'cached statement plan is invalid'):
42+
async with self.con.transaction():
43+
result = await self.con.fetchrow('SELECT * FROM tab1')
44+
45+
# This is now OK,
46+
result = await self.con.fetchrow('SELECT * FROM tab1')
47+
self.assertEqual(result, (1, '2'))
48+
finally:
49+
await self.con.execute('DROP TABLE tab1')
50+
51+
async def test_prepare_cache_invalidation_in_pool(self):
52+
pool = await self.create_pool(database='postgres',
53+
min_size=2, max_size=2)
54+
55+
await self.con.execute('CREATE TABLE tab1(a int, b int)')
56+
57+
try:
58+
await self.con.execute('INSERT INTO tab1 VALUES (1, 2)')
59+
60+
con1 = await pool.acquire()
61+
con2 = await pool.acquire()
62+
63+
result = await con1.fetchrow('SELECT * FROM tab1')
64+
self.assertEqual(result, (1, 2))
65+
66+
result = await con2.fetchrow('SELECT * FROM tab1')
67+
self.assertEqual(result, (1, 2))
68+
69+
await self.con.execute(
70+
'ALTER TABLE tab1 ALTER COLUMN b SET DATA TYPE text')
71+
72+
# con1 tries the same plan, will invalidate the cache
73+
# for the entire pool.
74+
result = await con1.fetchrow('SELECT * FROM tab1')
75+
self.assertEqual(result, (1, '2'))
76+
77+
async with con2.transaction():
78+
# This should work, as con1 should have invalidated
79+
# the plan cache.
80+
result = await con2.fetchrow('SELECT * FROM tab1')
81+
self.assertEqual(result, (1, '2'))
82+
83+
finally:
84+
await self.con.execute('DROP TABLE tab1')
85+
await pool.close()

0 commit comments

Comments
 (0)