Skip to content

Compatibility with pgbouncer with pool_mode != session #348

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions asyncpg/connect_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
'database',
'ssl',
'connect_timeout',
'session',
'server_settings',
])

Expand Down Expand Up @@ -135,7 +136,7 @@ def _read_password_from_pgpass(

def _parse_connect_dsn_and_args(*, dsn, host, port, user,
password, passfile, database, ssl,
connect_timeout, server_settings):
connect_timeout, session, server_settings):
if host is not None and not isinstance(host, str):
raise TypeError(
'host argument is expected to be str, got {!r}'.format(
Expand Down Expand Up @@ -320,7 +321,8 @@ def _parse_connect_dsn_and_args(*, dsn, host, port, user,

params = _ConnectionParameters(
user=user, password=password, database=database, ssl=ssl,
connect_timeout=connect_timeout, server_settings=server_settings)
connect_timeout=connect_timeout, session=session,
server_settings=server_settings)

return addrs, params

Expand All @@ -330,7 +332,7 @@ def _parse_connect_arguments(*, dsn, host, port, user, password, passfile,
statement_cache_size,
max_cached_statement_lifetime,
max_cacheable_statement_size,
ssl, server_settings):
session, ssl, server_settings):

local_vars = locals()
for var_name in {'max_cacheable_statement_size',
Expand Down Expand Up @@ -359,7 +361,7 @@ def _parse_connect_arguments(*, dsn, host, port, user, password, passfile,
dsn=dsn, host=host, port=port, user=user,
password=password, passfile=passfile, ssl=ssl,
database=database, connect_timeout=timeout,
server_settings=server_settings)
session=session, server_settings=server_settings)

config = _ClientConfiguration(
command_timeout=command_timeout,
Expand Down
24 changes: 13 additions & 11 deletions asyncpg/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,13 +315,12 @@ async def _get_statement(self, query, timeout, *, named: bool=False,
len(query) > self._config.max_cacheable_statement_size):
use_cache = False

if use_cache or named:
if (use_cache or named) and self._protocol.session:
stmt_name = self._get_unique_id('stmt')
else:
stmt_name = ''

statement = await self._protocol.prepare(stmt_name, query, timeout)
need_reprepare = False
types_with_missing_codecs = statement._init_types()
tries = 0
while types_with_missing_codecs:
Expand All @@ -334,10 +333,6 @@ async def _get_statement(self, query, timeout, *, named: bool=False,

settings.register_data_types(types)

# The introspection query has used an anonymous statement,
# which has blown away the anonymous statement we've prepared
# for the query, so we need to re-prepare it.
need_reprepare = not intro_stmt.name and not statement.name
types_with_missing_codecs = statement._init_types()
tries += 1
if tries > 5:
Expand All @@ -354,10 +349,6 @@ async def _get_statement(self, query, timeout, *, named: bool=False,
# for the statement.
statement._init_codecs()

if need_reprepare:
await self._protocol.prepare(
stmt_name, query, timeout, state=statement)

if use_cache:
self._stmt_cache.put(query, statement)

Expand Down Expand Up @@ -1466,6 +1457,7 @@ async def connect(dsn=None, *,
statement_cache_size=100,
max_cached_statement_lifetime=300,
max_cacheable_statement_size=1024 * 15,
session=True,
command_timeout=None,
ssl=None,
connection_class=Connection,
Expand Down Expand Up @@ -1527,6 +1519,15 @@ async def connect(dsn=None, *,
default). Pass ``0`` to allow all statements to be cached
regardless of their size.

:param bool session:
if features which lifetime is PostgreSQL session can be used
by the client (the default is ``True``). Set this to ``False``
if connecting to ``pgbouncer`` with ``pool_mode`` set to
``transaction`` or ``statement``.
This will completely prevent usage of named prepared statements.
Unnmamed prepared statements will be used during transactions
only.

:param float command_timeout:
the default timeout for operations on this connection
(the default is ``None``: no timeout).
Expand Down Expand Up @@ -1599,7 +1600,8 @@ class of the returned connection object. Must be a subclass of
command_timeout=command_timeout,
statement_cache_size=statement_cache_size,
max_cached_statement_lifetime=max_cached_statement_lifetime,
max_cacheable_statement_size=max_cacheable_statement_size)
max_cacheable_statement_size=max_cacheable_statement_size,
session=session)


class _StatementCacheEntry:
Expand Down
14 changes: 3 additions & 11 deletions asyncpg/exceptions/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,17 +132,9 @@ def _make_constructor(cls, fields, query=None):
hint = dct.get('hint', '')
hint += textwrap.dedent("""\

NOTE: pgbouncer with pool_mode set to "transaction" or
"statement" does not support prepared statements properly.
You have two options:

* if you are using pgbouncer for connection pooling to a
single server, switch to the connection pool functionality
provided by asyncpg, it is a much better option for this
purpose;

* if you have no option of avoiding the use of pgbouncer,
then you must switch pgbouncer's pool_mode to "session".
if you are using pgbouncer with pool_mode set to "transaction"
or "statement", then you must initialize Connection with
session set to False.
""")

dct['hint'] = hint
Expand Down
25 changes: 16 additions & 9 deletions asyncpg/protocol/coreproto.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ cdef class CoreProtocol:
# True - completed, False - suspended
bint result_execute_completed

cpdef is_in_transaction(self)

cdef _process__auth(self, char mtype)
cdef _process__prepare(self, char mtype)
cdef _process__bind_execute(self, char mtype)
Expand Down Expand Up @@ -134,7 +136,6 @@ cdef class CoreProtocol:
cdef _auth_password_message_md5(self, bytes salt)

cdef _write(self, buf)
cdef inline _write_sync_message(self)

cdef _read_server_messages(self)

Expand All @@ -144,21 +145,27 @@ cdef class CoreProtocol:

cdef _ensure_connected(self)

cdef WriteBuffer _build_parse_message(self, str stmt_name, str query)
cdef WriteBuffer _build_describe_message(self, str stmt_name)
cdef WriteBuffer _build_bind_message(self, str portal_name,
str stmt_name,
WriteBuffer bind_data)
cdef WriteBuffer _build_execute_message(self, str portal_name,
int32_t limit)


cdef _connect(self)
cdef _prepare(self, str stmt_name, str query)
cdef _send_bind_message(self, str portal_name, str stmt_name,
WriteBuffer bind_data, int32_t limit)
cdef _bind_execute(self, str portal_name, str stmt_name,
WriteBuffer bind_data, int32_t limit)
cdef _bind_execute_many(self, str portal_name, str stmt_name,
object bind_data)
cdef _bind(self, str portal_name, str stmt_name,
WriteBuffer bind_data)
cdef _send_parse_bind_execute(self, str stmt_name, str query,
str portal_name, WriteBuffer bind_data,
int32_t limit)
cdef _parse_bind_execute(self, str stmt_name, str query,
str portal_name, WriteBuffer bind_data,
int32_t limit)
cdef _parse_bind_execute_many(self, str stmt_name, str query,
str portal_name, object bind_data)
cdef _parse_bind(self, str stmt_name, str query,
str portal_name, WriteBuffer bind_data)
cdef _execute(self, str portal_name, int32_t limit)
cdef _close(self, str name, bint is_portal)
cdef _simple_query(self, str query)
Expand Down
Loading