From 358d76c3d21e2a10af8e8a90e69409b6778cff19 Mon Sep 17 00:00:00 2001 From: Aleksey Mashanov Date: Wed, 15 Aug 2018 21:23:58 +0300 Subject: [PATCH 1/2] Group B/E/S into one TCP packet --- asyncpg/protocol/coreproto.pxd | 1 - asyncpg/protocol/coreproto.pyx | 34 +++++++++++++++++++--------------- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/asyncpg/protocol/coreproto.pxd b/asyncpg/protocol/coreproto.pxd index 60efa591..d35334e5 100644 --- a/asyncpg/protocol/coreproto.pxd +++ b/asyncpg/protocol/coreproto.pxd @@ -134,7 +134,6 @@ cdef class CoreProtocol: cdef _auth_password_message_md5(self, bytes salt) cdef _write(self, buf) - cdef inline _write_sync_message(self) cdef _read_server_messages(self) diff --git a/asyncpg/protocol/coreproto.pyx b/asyncpg/protocol/coreproto.pyx index acfec953..da0ecfbc 100644 --- a/asyncpg/protocol/coreproto.pyx +++ b/asyncpg/protocol/coreproto.pyx @@ -35,9 +35,6 @@ cdef class CoreProtocol: cdef _write(self, buf): self.transport.write(memoryview(buf)) - cdef inline _write_sync_message(self): - self.transport.write(SYNC_MESSAGE) - cdef _read_server_messages(self): cdef: char mtype @@ -749,14 +746,12 @@ cdef class CoreProtocol: self._ensure_connected() self._set_state(PROTOCOL_PREPARE) - packet = WriteBuffer.new() - buf = WriteBuffer.new_message(b'P') buf.write_str(stmt_name, self.encoding) buf.write_str(query, self.encoding) buf.write_int16(0) buf.end_message() - packet.write_buffer(buf) + packet = buf buf = WriteBuffer.new_message(b'D') buf.write_byte(b'S') @@ -766,23 +761,27 @@ cdef class CoreProtocol: packet.write_bytes(SYNC_MESSAGE) - self.transport.write(memoryview(packet)) + self._write(packet) cdef _send_bind_message(self, str portal_name, str stmt_name, WriteBuffer bind_data, int32_t limit): - cdef WriteBuffer buf + cdef: + WriteBuffer packet + WriteBuffer buf buf = self._build_bind_message(portal_name, stmt_name, bind_data) - self._write(buf) + packet = buf buf = WriteBuffer.new_message(b'E') buf.write_str(portal_name, self.encoding) # name of the portal buf.write_int32(limit) # number of rows to return; 0 - all buf.end_message() - self._write(buf) + packet.write_buffer(buf) - self._write_sync_message() + packet.write_bytes(SYNC_MESSAGE) + + self._write(packet) cdef _bind_execute(self, str portal_name, str stmt_name, WriteBuffer bind_data, int32_t limit): @@ -833,8 +832,10 @@ cdef class CoreProtocol: buf.write_str(portal_name, self.encoding) # name of the portal buf.write_int32(limit) # number of rows to return; 0 - all buf.end_message() + + buf.write_bytes(SYNC_MESSAGE) + self._write(buf) - self._write_sync_message() cdef _bind(self, str portal_name, str stmt_name, WriteBuffer bind_data): @@ -845,8 +846,10 @@ cdef class CoreProtocol: self._set_state(PROTOCOL_BIND) buf = self._build_bind_message(portal_name, stmt_name, bind_data) + + buf.write_bytes(SYNC_MESSAGE) + self._write(buf) - self._write_sync_message() cdef _close(self, str name, bint is_portal): cdef WriteBuffer buf @@ -863,9 +866,10 @@ cdef class CoreProtocol: buf.write_str(name, self.encoding) buf.end_message() - self._write(buf) - self._write_sync_message() + buf.write_bytes(SYNC_MESSAGE) + + self._write(buf) cdef _simple_query(self, str query): cdef WriteBuffer buf From 7b43417f03c970af542d1b822ccf8d88ed3561c5 Mon Sep 17 00:00:00 2001 From: Aleksey Mashanov Date: Tue, 21 Aug 2018 22:19:58 +0300 Subject: [PATCH 2/2] Compatibility with pgbouncer with pool_mode != session Add `session` parameter to `connection()` with default value `True`. When set to `False` the client will limit usage of prepared statements to unnamed in a transaction only. As a side effect handling of unnamed prepared statements improved irrespective of `session` value - they are automatically reprepared if something invalidating them occurs. --- asyncpg/connect_utils.py | 10 +- asyncpg/connection.py | 24 ++--- asyncpg/exceptions/_base.py | 14 +-- asyncpg/protocol/coreproto.pxd | 24 +++-- asyncpg/protocol/coreproto.pyx | 141 ++++++++++++++++++----------- asyncpg/protocol/prepared_stmt.pxd | 2 + asyncpg/protocol/prepared_stmt.pyx | 1 + asyncpg/protocol/protocol.pyx | 65 +++++++++---- docs/api/index.rst | 3 +- docs/faq.rst | 13 +-- tests/test_connect.py | 45 +++++++-- 11 files changed, 217 insertions(+), 125 deletions(-) diff --git a/asyncpg/connect_utils.py b/asyncpg/connect_utils.py index 9823d048..9af509c5 100644 --- a/asyncpg/connect_utils.py +++ b/asyncpg/connect_utils.py @@ -33,6 +33,7 @@ 'database', 'ssl', 'connect_timeout', + 'session', 'server_settings', ]) @@ -135,7 +136,7 @@ def _read_password_from_pgpass( def _parse_connect_dsn_and_args(*, dsn, host, port, user, password, passfile, database, ssl, - connect_timeout, server_settings): + connect_timeout, session, server_settings): if host is not None and not isinstance(host, str): raise TypeError( 'host argument is expected to be str, got {!r}'.format( @@ -320,7 +321,8 @@ def _parse_connect_dsn_and_args(*, dsn, host, port, user, params = _ConnectionParameters( user=user, password=password, database=database, ssl=ssl, - connect_timeout=connect_timeout, server_settings=server_settings) + connect_timeout=connect_timeout, session=session, + server_settings=server_settings) return addrs, params @@ -330,7 +332,7 @@ def _parse_connect_arguments(*, dsn, host, port, user, password, passfile, statement_cache_size, max_cached_statement_lifetime, max_cacheable_statement_size, - ssl, server_settings): + session, ssl, server_settings): local_vars = locals() for var_name in {'max_cacheable_statement_size', @@ -359,7 +361,7 @@ def _parse_connect_arguments(*, dsn, host, port, user, password, passfile, dsn=dsn, host=host, port=port, user=user, password=password, passfile=passfile, ssl=ssl, database=database, connect_timeout=timeout, - server_settings=server_settings) + session=session, server_settings=server_settings) config = _ClientConfiguration( command_timeout=command_timeout, diff --git a/asyncpg/connection.py b/asyncpg/connection.py index 8f551a02..e594f1ff 100644 --- a/asyncpg/connection.py +++ b/asyncpg/connection.py @@ -315,13 +315,12 @@ async def _get_statement(self, query, timeout, *, named: bool=False, len(query) > self._config.max_cacheable_statement_size): use_cache = False - if use_cache or named: + if (use_cache or named) and self._protocol.session: stmt_name = self._get_unique_id('stmt') else: stmt_name = '' statement = await self._protocol.prepare(stmt_name, query, timeout) - need_reprepare = False types_with_missing_codecs = statement._init_types() tries = 0 while types_with_missing_codecs: @@ -334,10 +333,6 @@ async def _get_statement(self, query, timeout, *, named: bool=False, settings.register_data_types(types) - # The introspection query has used an anonymous statement, - # which has blown away the anonymous statement we've prepared - # for the query, so we need to re-prepare it. - need_reprepare = not intro_stmt.name and not statement.name types_with_missing_codecs = statement._init_types() tries += 1 if tries > 5: @@ -354,10 +349,6 @@ async def _get_statement(self, query, timeout, *, named: bool=False, # for the statement. statement._init_codecs() - if need_reprepare: - await self._protocol.prepare( - stmt_name, query, timeout, state=statement) - if use_cache: self._stmt_cache.put(query, statement) @@ -1466,6 +1457,7 @@ async def connect(dsn=None, *, statement_cache_size=100, max_cached_statement_lifetime=300, max_cacheable_statement_size=1024 * 15, + session=True, command_timeout=None, ssl=None, connection_class=Connection, @@ -1527,6 +1519,15 @@ async def connect(dsn=None, *, default). Pass ``0`` to allow all statements to be cached regardless of their size. + :param bool session: + if features which lifetime is PostgreSQL session can be used + by the client (the default is ``True``). Set this to ``False`` + if connecting to ``pgbouncer`` with ``pool_mode`` set to + ``transaction`` or ``statement``. + This will completely prevent usage of named prepared statements. + Unnmamed prepared statements will be used during transactions + only. + :param float command_timeout: the default timeout for operations on this connection (the default is ``None``: no timeout). @@ -1599,7 +1600,8 @@ class of the returned connection object. Must be a subclass of command_timeout=command_timeout, statement_cache_size=statement_cache_size, max_cached_statement_lifetime=max_cached_statement_lifetime, - max_cacheable_statement_size=max_cacheable_statement_size) + max_cacheable_statement_size=max_cacheable_statement_size, + session=session) class _StatementCacheEntry: diff --git a/asyncpg/exceptions/_base.py b/asyncpg/exceptions/_base.py index d59cf671..d0bc6c1b 100644 --- a/asyncpg/exceptions/_base.py +++ b/asyncpg/exceptions/_base.py @@ -132,17 +132,9 @@ def _make_constructor(cls, fields, query=None): hint = dct.get('hint', '') hint += textwrap.dedent("""\ - NOTE: pgbouncer with pool_mode set to "transaction" or - "statement" does not support prepared statements properly. - You have two options: - - * if you are using pgbouncer for connection pooling to a - single server, switch to the connection pool functionality - provided by asyncpg, it is a much better option for this - purpose; - - * if you have no option of avoiding the use of pgbouncer, - then you must switch pgbouncer's pool_mode to "session". + if you are using pgbouncer with pool_mode set to "transaction" + or "statement", then you must initialize Connection with + session set to False. """) dct['hint'] = hint diff --git a/asyncpg/protocol/coreproto.pxd b/asyncpg/protocol/coreproto.pxd index d35334e5..01283e31 100644 --- a/asyncpg/protocol/coreproto.pxd +++ b/asyncpg/protocol/coreproto.pxd @@ -104,6 +104,8 @@ cdef class CoreProtocol: # True - completed, False - suspended bint result_execute_completed + cpdef is_in_transaction(self) + cdef _process__auth(self, char mtype) cdef _process__prepare(self, char mtype) cdef _process__bind_execute(self, char mtype) @@ -143,21 +145,27 @@ cdef class CoreProtocol: cdef _ensure_connected(self) + cdef WriteBuffer _build_parse_message(self, str stmt_name, str query) + cdef WriteBuffer _build_describe_message(self, str stmt_name) cdef WriteBuffer _build_bind_message(self, str portal_name, str stmt_name, WriteBuffer bind_data) + cdef WriteBuffer _build_execute_message(self, str portal_name, + int32_t limit) cdef _connect(self) cdef _prepare(self, str stmt_name, str query) - cdef _send_bind_message(self, str portal_name, str stmt_name, - WriteBuffer bind_data, int32_t limit) - cdef _bind_execute(self, str portal_name, str stmt_name, - WriteBuffer bind_data, int32_t limit) - cdef _bind_execute_many(self, str portal_name, str stmt_name, - object bind_data) - cdef _bind(self, str portal_name, str stmt_name, - WriteBuffer bind_data) + cdef _send_parse_bind_execute(self, str stmt_name, str query, + str portal_name, WriteBuffer bind_data, + int32_t limit) + cdef _parse_bind_execute(self, str stmt_name, str query, + str portal_name, WriteBuffer bind_data, + int32_t limit) + cdef _parse_bind_execute_many(self, str stmt_name, str query, + str portal_name, object bind_data) + cdef _parse_bind(self, str stmt_name, str query, + str portal_name, WriteBuffer bind_data) cdef _execute(self, str portal_name, int32_t limit) cdef _close(self, str name, bint is_portal) cdef _simple_query(self, str query) diff --git a/asyncpg/protocol/coreproto.pyx b/asyncpg/protocol/coreproto.pyx index da0ecfbc..2c27446a 100644 --- a/asyncpg/protocol/coreproto.pyx +++ b/asyncpg/protocol/coreproto.pyx @@ -29,9 +29,15 @@ cdef class CoreProtocol: self._execute_iter = None self._execute_portal_name = None self._execute_stmt_name = None + self._execute_query = None self._reset_result() + cpdef is_in_transaction(self): + # PQTRANS_INTRANS = idle, within transaction block + # PQTRANS_INERROR = idle, within failed transaction + return self.xact_status in (PQTRANS_INTRANS, PQTRANS_INERROR) + cdef _write(self, buf): self.transport.write(memoryview(buf)) @@ -265,10 +271,15 @@ cdef class CoreProtocol: self.result = e self._push_result() else: + if self.con_params.session or self.is_in_transaction(): + query = None + else: + query = self._execute_query + # Next iteration over the executemany() arg sequence - self._send_bind_message( - self._execute_portal_name, self._execute_stmt_name, - buf, 0) + self._send_parse_bind_execute( + self._execute_stmt_name, query, + self._execute_portal_name, buf, 0) elif mtype == b'I': # EmptyQueryResponse @@ -681,6 +692,27 @@ cdef class CoreProtocol: if self.con_status != CONNECTION_OK: raise apg_exc.InternalClientError('not connected') + cdef WriteBuffer _build_parse_message(self, str stmt_name, str query): + cdef WriteBuffer buf + + buf = WriteBuffer.new_message(b'P') + buf.write_str(stmt_name, self.encoding) + buf.write_str(query, self.encoding) + buf.write_int16(0) + buf.end_message() + + return buf + + cdef WriteBuffer _build_describe_message(self, str stmt_name): + cdef WriteBuffer buf + + buf = WriteBuffer.new_message(b'D') + buf.write_byte(b'S') + buf.write_str(stmt_name, self.encoding) + buf.end_message() + + return buf + cdef WriteBuffer _build_bind_message(self, str portal_name, str stmt_name, WriteBuffer bind_data): @@ -696,6 +728,17 @@ cdef class CoreProtocol: buf.end_message() return buf + cdef WriteBuffer _build_execute_message(self, str portal_name, + int32_t limit): + cdef WriteBuffer buf + + buf = WriteBuffer.new_message(b'E') + buf.write_str(portal_name, self.encoding) # name of the portal + buf.write_int32(limit) # number of rows to return; 0 - all + buf.end_message() + + return buf + # API for subclasses cdef _connect(self): @@ -739,64 +782,50 @@ cdef class CoreProtocol: self._write(outbuf) cdef _prepare(self, str stmt_name, str query): - cdef: - WriteBuffer packet - WriteBuffer buf + cdef WriteBuffer buf self._ensure_connected() self._set_state(PROTOCOL_PREPARE) - buf = WriteBuffer.new_message(b'P') - buf.write_str(stmt_name, self.encoding) - buf.write_str(query, self.encoding) - buf.write_int16(0) - buf.end_message() - packet = buf - - buf = WriteBuffer.new_message(b'D') - buf.write_byte(b'S') - buf.write_str(stmt_name, self.encoding) - buf.end_message() - packet.write_buffer(buf) - - packet.write_bytes(SYNC_MESSAGE) + buf = self._build_parse_message(stmt_name, query) + buf.write_buffer(self._build_describe_message(stmt_name)) - self._write(packet) + buf.write_bytes(SYNC_MESSAGE) - cdef _send_bind_message(self, str portal_name, str stmt_name, - WriteBuffer bind_data, int32_t limit): + self._write(buf) + cdef _send_parse_bind_execute(self, str stmt_name, str query, + str portal_name, WriteBuffer bind_data, + int32_t limit): cdef: - WriteBuffer packet WriteBuffer buf + WriteBuffer pbuf buf = self._build_bind_message(portal_name, stmt_name, bind_data) - packet = buf + buf.write_buffer(self._build_execute_message(portal_name, limit)) - buf = WriteBuffer.new_message(b'E') - buf.write_str(portal_name, self.encoding) # name of the portal - buf.write_int32(limit) # number of rows to return; 0 - all - buf.end_message() - packet.write_buffer(buf) + if query is not None: + pbuf = self._build_parse_message(stmt_name, query) + pbuf.write_buffer(buf) + buf = pbuf - packet.write_bytes(SYNC_MESSAGE) - - self._write(packet) + buf.write_bytes(SYNC_MESSAGE) - cdef _bind_execute(self, str portal_name, str stmt_name, - WriteBuffer bind_data, int32_t limit): + self._write(buf) - cdef WriteBuffer buf + cdef _parse_bind_execute(self, str stmt_name, str query, str portal_name, + WriteBuffer bind_data, int32_t limit): self._ensure_connected() self._set_state(PROTOCOL_BIND_EXECUTE) self.result = [] - self._send_bind_message(portal_name, stmt_name, bind_data, limit) + self._send_parse_bind_execute(stmt_name, query, + portal_name, bind_data, limit) - cdef _bind_execute_many(self, str portal_name, str stmt_name, - object bind_data): + cdef _parse_bind_execute_many(self, str stmt_name, str query, + str portal_name, object bind_data): cdef WriteBuffer buf @@ -808,6 +837,7 @@ cdef class CoreProtocol: self._execute_iter = bind_data self._execute_portal_name = portal_name self._execute_stmt_name = stmt_name + self._execute_query = query try: buf = next(bind_data) @@ -818,34 +848,39 @@ cdef class CoreProtocol: self.result = e self._push_result() else: - self._send_bind_message(portal_name, stmt_name, buf, 0) + self._send_parse_bind_execute(stmt_name, query, + portal_name, buf, 0) - cdef _execute(self, str portal_name, int32_t limit): - cdef WriteBuffer buf + cdef _parse_bind(self, str stmt_name, str query, + str portal_name, WriteBuffer bind_data): + + cdef: + WriteBuffer buf + WriteBuffer pbuf self._ensure_connected() - self._set_state(PROTOCOL_EXECUTE) + self._set_state(PROTOCOL_BIND) - self.result = [] + buf = self._build_bind_message(portal_name, stmt_name, bind_data) - buf = WriteBuffer.new_message(b'E') - buf.write_str(portal_name, self.encoding) # name of the portal - buf.write_int32(limit) # number of rows to return; 0 - all - buf.end_message() + if query is not None: + pbuf = self._build_parse_message(stmt_name, query) + pbuf.write_buffer(buf) + buf = pbuf buf.write_bytes(SYNC_MESSAGE) self._write(buf) - cdef _bind(self, str portal_name, str stmt_name, - WriteBuffer bind_data): - + cdef _execute(self, str portal_name, int32_t limit): cdef WriteBuffer buf self._ensure_connected() - self._set_state(PROTOCOL_BIND) + self._set_state(PROTOCOL_EXECUTE) - buf = self._build_bind_message(portal_name, stmt_name, bind_data) + self.result = [] + + buf = self._build_execute_message(portal_name, limit) buf.write_bytes(SYNC_MESSAGE) diff --git a/asyncpg/protocol/prepared_stmt.pxd b/asyncpg/protocol/prepared_stmt.pxd index 84c546b3..d3445274 100644 --- a/asyncpg/protocol/prepared_stmt.pxd +++ b/asyncpg/protocol/prepared_stmt.pxd @@ -12,6 +12,8 @@ cdef class PreparedStatementState: readonly bint closed readonly int refs + public bint need_reprepare + FastReadBuffer buffer list row_desc diff --git a/asyncpg/protocol/prepared_stmt.pyx b/asyncpg/protocol/prepared_stmt.pyx index 8e56fa8a..f22bdeef 100644 --- a/asyncpg/protocol/prepared_stmt.pyx +++ b/asyncpg/protocol/prepared_stmt.pyx @@ -19,6 +19,7 @@ cdef class PreparedStatementState: self.args_codecs = self.rows_codecs = None self.args_num = self.cols_num = 0 self.cols_desc = None + self.need_reprepare = False self.closed = False self.refs = 0 self.buffer = FastReadBuffer.new() diff --git a/asyncpg/protocol/protocol.pyx b/asyncpg/protocol/protocol.pyx index 6731719c..d81c0f5b 100644 --- a/asyncpg/protocol/protocol.pyx +++ b/asyncpg/protocol/protocol.pyx @@ -105,6 +105,9 @@ cdef class BaseProtocol(CoreProtocol): self.address = addr self.settings = ConnectionSettings((self.address, con_params.database)) + self.session = con_params.session + self.last_unnamed = None + self.statement = None self.return_extra = False @@ -138,11 +141,6 @@ cdef class BaseProtocol(CoreProtocol): def get_settings(self): return self.settings - def is_in_transaction(self): - # PQTRANS_INTRANS = idle, within transaction block - # PQTRANS_INERROR = idle, within failed transaction - return self.xact_status in (PQTRANS_INTRANS, PQTRANS_INERROR) - cdef inline resume_reading(self): if not self.is_reading: self.is_reading = True @@ -172,6 +170,8 @@ cdef class BaseProtocol(CoreProtocol): if state is None: state = PreparedStatementState(stmt_name, query, self) self.statement = state + if stmt_name == '': + self.last_unnamed = state except Exception as ex: waiter.set_exception(ex) self._coreproto_error() @@ -195,10 +195,17 @@ cdef class BaseProtocol(CoreProtocol): waiter = self._new_waiter(timeout) try: - self._bind_execute( - portal_name, - state.name, - args_buf, + if state.name == '' and (state != self.last_unnamed + or state.need_reprepare): + query = state.query + state.need_reprepare = False + self.last_unnamed = state + else: + query = None + + self._parse_bind_execute( + state.name, query, + portal_name, args_buf, limit) # network op self.last_query = state.query @@ -232,10 +239,17 @@ cdef class BaseProtocol(CoreProtocol): waiter = self._new_waiter(timeout) try: - self._bind_execute_many( - portal_name, - state.name, - arg_bufs) # network op + if state.name == '' and (state != self.last_unnamed + or state.need_reprepare): + query = state.query + state.need_reprepare = False + self.last_unnamed = state + else: + query = None + + self._parse_bind_execute_many( + state.name, query, + portal_name, arg_bufs) # network op self.last_query = state.query self.statement = state @@ -263,10 +277,17 @@ cdef class BaseProtocol(CoreProtocol): waiter = self._new_waiter(timeout) try: - self._bind( - portal_name, - state.name, - args_buf) # network op + if state.name == '' and (state != self.last_unnamed + or state.need_reprepare): + query = state.query + state.need_reprepare = False + self.last_unnamed = state + else: + query = None + + self._parse_bind( + state.name, query, + portal_name, args_buf) # network op self.last_query = state.query self.statement = state @@ -314,6 +335,9 @@ cdef class BaseProtocol(CoreProtocol): await self.cancel_sent_waiter self.cancel_sent_waiter = None + if self.last_unnamed is not None: + self.last_unnamed.need_reprepare = True + self._check_state() # query() needs to call _get_timeout instead of _get_timeout_impl # for consistent validation, as it is called differently from @@ -520,6 +544,10 @@ cdef class BaseProtocol(CoreProtocol): 'cannot close prepared statement; refs == {} != 0'.format( state.refs)) + if state.name == '' and (state != self.last_unnamed + or state.need_reprepare): + return + timeout = self._get_timeout_impl(timeout) waiter = self._new_waiter(timeout) try: @@ -855,6 +883,9 @@ cdef class BaseProtocol(CoreProtocol): self.statement = None self.last_query = None self.return_extra = False + if (not self.session and not self.is_in_transaction() + and self.last_unnamed is not None): + self.last_unnamed.need_reprepare = True cdef _on_notice(self, parsed): con = self.get_connection() diff --git a/docs/api/index.rst b/docs/api/index.rst index d63ab8d8..32a8ca5c 100644 --- a/docs/api/index.rst +++ b/docs/api/index.rst @@ -56,8 +56,7 @@ a need to run the same query again. .. warning:: If you are using pgbouncer with ``pool_mode`` set to ``transaction`` or - ``statement``, prepared statements will not work correctly. See - :ref:`asyncpg-prepared-stmt-errors` for more information. + ``statement``, initialize Connection with session set to False. .. autoclass:: asyncpg.prepared_stmt.PreparedStatement() diff --git a/docs/faq.rst b/docs/faq.rst index 120b05af..335e44b9 100644 --- a/docs/faq.rst +++ b/docs/faq.rst @@ -56,15 +56,10 @@ already exists`` errors, you are most likely not connecting to the PostgreSQL server directly, but via `pgbouncer `_. pgbouncer, when in the ``"transaction"`` or ``"statement"`` pooling mode, does not support -prepared statements. You have two options: - -* if you are using pgbouncer for connection pooling to a single server, - switch to the :ref:`connection pool ` - functionality provided by asyncpg, it is a much better option for this - purpose; - -* if you have no option of avoiding the use of pgbouncer, then you need to - switch pgbouncer's ``pool_mode`` to ``session``. +any features with session lifetime (including prepared statements). +You must initialize :class:`Connection` with ``session`` set to ``False``, this +will prevent the client from using named prepared statements and will limit it +to use unnamed prepared statements in a transaction only. Why do I get ``PostgresSyntaxError`` when using ``expression IN $1``? diff --git a/tests/test_connect.py b/tests/test_connect.py index 06ceb77e..8abd8ed3 100644 --- a/tests/test_connect.py +++ b/tests/test_connect.py @@ -206,7 +206,8 @@ class TestConnectParams(tb.TestCase): 'result': ([('host', 123)], { 'user': 'user', 'password': 'passw', - 'database': 'testdb'}) + 'database': 'testdb', + 'session': True}) }, { @@ -227,7 +228,8 @@ class TestConnectParams(tb.TestCase): 'result': ([('host2', 456)], { 'user': 'user2', 'password': 'passw2', - 'database': 'db2'}) + 'database': 'db2', + 'session': True}) }, { @@ -250,7 +252,8 @@ class TestConnectParams(tb.TestCase): 'result': ([('host2', 456)], { 'user': 'user2', 'password': 'passw2', - 'database': 'db2'}) + 'database': 'db2', + 'session': True}) }, { @@ -267,7 +270,8 @@ class TestConnectParams(tb.TestCase): 'result': ([('localhost', 5555)], { 'user': 'user3', 'password': '123123', - 'database': 'abcdef'}) + 'database': 'abcdef', + 'session': True}) }, { @@ -275,7 +279,8 @@ class TestConnectParams(tb.TestCase): 'result': ([('localhost', 5555)], { 'user': 'user3', 'password': '123123', - 'database': 'abcdef'}) + 'database': 'abcdef', + 'session': True}) }, { @@ -291,7 +296,8 @@ class TestConnectParams(tb.TestCase): 'server_settings': {'param': '123'}, 'user': 'me', 'password': 'ask', - 'database': 'db'}) + 'database': 'db', + 'session': True}) }, { @@ -308,14 +314,16 @@ class TestConnectParams(tb.TestCase): 'server_settings': {'aa': 'bb', 'param': '123'}, 'user': 'me', 'password': 'ask', - 'database': 'db'}) + 'database': 'db', + 'session': True}) }, { 'dsn': 'postgresql:///dbname?host=/unix_sock/test&user=spam', 'result': ([os.path.join('/unix_sock/test', '.s.PGSQL.5432')], { 'user': 'spam', - 'database': 'dbname'}) + 'database': 'dbname', + 'session': True}) }, { @@ -384,7 +392,8 @@ def run_testcase(self, testcase): addrs, params = connect_utils._parse_connect_dsn_and_args( dsn=dsn, host=host, port=port, user=user, password=password, passfile=passfile, database=database, ssl=None, - connect_timeout=None, server_settings=server_settings) + connect_timeout=None, session=True, + server_settings=server_settings) params = {k: v for k, v in params._asdict().items() if v is not None} @@ -429,7 +438,11 @@ def test_test_connect_params_run_testcase(self): 'host': 'abc', 'result': ( [('abc', 5432)], - {'user': '__test__', 'database': '__test__'} + { + 'user': '__test__', + 'database': '__test__', + 'session': True, + } ) }) @@ -467,6 +480,7 @@ def test_connect_pgpass_regular(self): 'password': 'password from pgpass for user@abc', 'user': 'user', 'database': 'db', + 'session': True, } ) }) @@ -483,6 +497,7 @@ def test_connect_pgpass_regular(self): 'password': 'password from pgpass for user@abc', 'user': 'user', 'database': 'db', + 'session': True, } ) }) @@ -497,6 +512,7 @@ def test_connect_pgpass_regular(self): 'password': 'password from pgpass for user@abc', 'user': 'user', 'database': 'db', + 'session': True, } ) }) @@ -512,6 +528,7 @@ def test_connect_pgpass_regular(self): 'password': 'password from pgpass for localhost', 'user': 'user', 'database': 'db', + 'session': True, } ) }) @@ -529,6 +546,7 @@ def test_connect_pgpass_regular(self): 'password': 'password from pgpass for localhost', 'user': 'user', 'database': 'db', + 'session': True, } ) }) @@ -546,6 +564,7 @@ def test_connect_pgpass_regular(self): 'password': 'password from pgpass for cde:5433', 'user': 'user', 'database': 'db', + 'session': True, } ) }) @@ -562,6 +581,7 @@ def test_connect_pgpass_regular(self): 'password': 'password from pgpass for testuser', 'user': 'testuser', 'database': 'db', + 'session': True, } ) }) @@ -578,6 +598,7 @@ def test_connect_pgpass_regular(self): 'password': 'password from pgpass for testdb', 'user': 'user', 'database': 'testdb', + 'session': True, } ) }) @@ -594,6 +615,7 @@ def test_connect_pgpass_regular(self): 'password': 'password from pgpass with escapes', 'user': R'test\\', 'database': R'test\:db', + 'session': True, } ) }) @@ -621,6 +643,7 @@ def test_connect_pgpass_badness_mode(self): { 'user': 'user', 'database': 'db', + 'session': True, } ) }) @@ -641,6 +664,7 @@ def test_connect_pgpass_badness_non_file(self): { 'user': 'user', 'database': 'db', + 'session': True, } ) }) @@ -657,6 +681,7 @@ def test_connect_pgpass_nonexistent(self): { 'user': 'user', 'database': 'db', + 'session': True, } ) })