Skip to content

Add copy_ wrappers to Pool #661

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 10, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 182 additions & 14 deletions asyncpg/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ async def execute(self, query: str, *args, timeout: float=None) -> str:

Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.execute() <connection.Connection.execute>`.
:meth:`Connection.execute() <asyncpg.connection.Connection.execute>`.

.. versionadded:: 0.10.0
"""
Expand All @@ -534,7 +534,8 @@ async def executemany(self, command: str, args, *, timeout: float=None):

Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.executemany() <connection.Connection.executemany>`.
:meth:`Connection.executemany()
<asyncpg.connection.Connection.executemany>`.

.. versionadded:: 0.10.0
"""
Expand All @@ -546,7 +547,7 @@ async def fetch(self, query, *args, timeout=None) -> list:

Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.fetch() <connection.Connection.fetch>`.
:meth:`Connection.fetch() <asyncpg.connection.Connection.fetch>`.

.. versionadded:: 0.10.0
"""
Expand All @@ -558,7 +559,8 @@ async def fetchval(self, query, *args, column=0, timeout=None):

Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.fetchval() <connection.Connection.fetchval>`.
:meth:`Connection.fetchval()
<asyncpg.connection.Connection.fetchval>`.

.. versionadded:: 0.10.0
"""
Expand All @@ -571,13 +573,178 @@ async def fetchrow(self, query, *args, timeout=None):

Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.fetchrow() <connection.Connection.fetchrow>`.
:meth:`Connection.fetchrow() <asyncpg.connection.Connection.fetchrow>`.

.. versionadded:: 0.10.0
"""
async with self.acquire() as con:
return await con.fetchrow(query, *args, timeout=timeout)

async def copy_from_table(
self,
table_name,
*,
output,
columns=None,
schema_name=None,
timeout=None,
format=None,
oids=None,
delimiter=None,
null=None,
header=None,
quote=None,
escape=None,
force_quote=None,
encoding=None
):
"""Copy table contents to a file or file-like object.

Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.copy_from_table()
<asyncpg.connection.Connection.copy_from_table>`.

.. versionadded:: 0.24.0
"""
async with self.acquire() as con:
return await con.copy_from_table(
table_name,
output=output,
columns=columns,
schema_name=schema_name,
timeout=timeout,
format=format,
oids=oids,
delimiter=delimiter,
null=null,
header=header,
quote=quote,
escape=escape,
force_quote=force_quote,
encoding=encoding
)

async def copy_from_query(
self,
query,
*args,
output,
timeout=None,
format=None,
oids=None,
delimiter=None,
null=None,
header=None,
quote=None,
escape=None,
force_quote=None,
encoding=None
):
"""Copy the results of a query to a file or file-like object.

Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.copy_from_query()
<asyncpg.connection.Connection.copy_from_query>`.

.. versionadded:: 0.24.0
"""
async with self.acquire() as con:
return await con.copy_from_query(
query,
*args,
output=output,
timeout=timeout,
format=format,
oids=oids,
delimiter=delimiter,
null=null,
header=header,
quote=quote,
escape=escape,
force_quote=force_quote,
encoding=encoding
)

async def copy_to_table(
self,
table_name,
*,
source,
columns=None,
schema_name=None,
timeout=None,
format=None,
oids=None,
freeze=None,
delimiter=None,
null=None,
header=None,
quote=None,
escape=None,
force_quote=None,
force_not_null=None,
force_null=None,
encoding=None
):
"""Copy data to the specified table.

Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.copy_to_table()
<asyncpg.connection.Connection.copy_to_table>`.

.. versionadded:: 0.24.0
"""
async with self.acquire() as con:
return await con.copy_to_table(
table_name,
source=source,
columns=columns,
schema_name=schema_name,
timeout=timeout,
format=format,
oids=oids,
freeze=freeze,
delimiter=delimiter,
null=null,
header=header,
quote=quote,
escape=escape,
force_quote=force_quote,
force_not_null=force_not_null,
force_null=force_null,
encoding=encoding
)

async def copy_records_to_table(
self,
table_name,
*,
records,
columns=None,
schema_name=None,
timeout=None
):
"""Copy a list of records to the specified table using binary COPY.

Pool performs this operation using one of its connections. Other than
that, it behaves identically to
:meth:`Connection.copy_records_to_table()
<asyncpg.connection.Connection.copy_records_to_table>`.

.. versionadded:: 0.24.0
"""
async with self.acquire() as con:
return await con.copy_records_to_table(
table_name,
records=records,
columns=columns,
schema_name=schema_name,
timeout=timeout
)

def acquire(self, *, timeout=None):
"""Acquire a database connection from the pool.

Expand Down Expand Up @@ -844,12 +1011,12 @@ def create_pool(dsn=None, *,

.. warning::
Prepared statements and cursors returned by
:meth:`Connection.prepare() <connection.Connection.prepare>` and
:meth:`Connection.cursor() <connection.Connection.cursor>` become
invalid once the connection is released. Likewise, all notification
and log listeners are removed, and ``asyncpg`` will issue a warning
if there are any listener callbacks registered on a connection that
is being released to the pool.
:meth:`Connection.prepare() <asyncpg.connection.Connection.prepare>`
and :meth:`Connection.cursor() <asyncpg.connection.Connection.cursor>`
become invalid once the connection is released. Likewise, all
notification and log listeners are removed, and ``asyncpg`` will
issue a warning if there are any listener callbacks registered on a
connection that is being released to the pool.

:param str dsn:
Connection arguments specified using as a single string in
Expand Down Expand Up @@ -915,10 +1082,11 @@ def create_pool(dsn=None, *,
.. versionchanged:: 0.13.0
An :exc:`~asyncpg.exceptions.InterfaceWarning` will be produced
if there are any active listeners (added via
:meth:`Connection.add_listener() <connection.Connection.add_listener>`
:meth:`Connection.add_listener()
<asyncpg.connection.Connection.add_listener>`
or :meth:`Connection.add_log_listener()
<connection.Connection.add_log_listener>`) present on the connection
at the moment of its release to the pool.
<asyncpg.connection.Connection.add_log_listener>`) present on the
connection at the moment of its release to the pool.

.. versionchanged:: 0.22.0
Added the *record_class* parameter.
Expand Down