From eb95a9da35553387408e425721449660cc83196d Mon Sep 17 00:00:00 2001 From: Jack Wotherspoon Date: Fri, 25 Oct 2024 15:38:49 -0400 Subject: [PATCH] feat: support native asyncpg connection pools (#1182) As of asyncpg v0.30.0, asyncpg native connection pools now support a creation function (callable) via its connect argument, similar to SQLAlchemy's async_creator argument to generate connections. Adding integration test and usage samples to README. Also bumping asyncpg min supported version in setup.py to 0.30.0 to force supported version for this feature. --- README.md | 92 ++++++++++++++++++--- setup.py | 2 +- tests/system/test_asyncpg_connection.py | 101 +++++++++++++++++++++++- 3 files changed, 180 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index c3f4c47a..28553f97 100644 --- a/README.md +++ b/README.md @@ -502,6 +502,42 @@ The `create_async_connector` allows all the same input arguments as the Once a `Connector` object is returned by `create_async_connector` you can call its `connect_async` method, just as you would the `connect` method: +#### Asyncpg Connection Pool + +```python +import asyncpg +from google.cloud.sql.connector import Connector, create_async_connector + +async def main(): + # initialize Connector object for connections to Cloud SQL + connector = create_async_connector() + + # creation function to generate asyncpg connections as the 'connect' arg + async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection: + return await connector.connect_async( + instance_connection_name, + "asyncpg", + user="my-user", + password="my-password", + db="my-db", + **kwargs, # ... additional asyncpg args + ) + + # initialize connection pool + pool = await asyncpg.create_pool( + "my-project:my-region:my-instance", connect=getconn + ) + + # acquire connection and query Cloud SQL database + async with pool.acquire() as conn: + res = await conn.fetch("SELECT NOW()") + + # close Connector + await connector.close_async() +``` + +#### SQLAlchemy Async Engine + ```python import asyncpg @@ -511,7 +547,7 @@ from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine from google.cloud.sql.connector import Connector, create_async_connector async def init_connection_pool(connector: Connector) -> AsyncEngine: - # initialize Connector object for connections to Cloud SQL + # creation function to generate asyncpg connections as 'async_creator' arg async def getconn() -> asyncpg.Connection: conn: asyncpg.Connection = await connector.connect_async( "project:region:instance", # Cloud SQL instance connection name @@ -564,6 +600,40 @@ calls to `connector.close_async()` to cleanup resources. > This alternative requires that the running event loop be > passed in as the `loop` argument to `Connector()`. +#### Asyncpg Connection Pool + +```python +import asyncpg +from google.cloud.sql.connector import Connector, create_async_connector + +async def main(): + # initialize Connector object for connections to Cloud SQL + loop = asyncio.get_running_loop() + async with Connector(loop=loop) as connector: + + # creation function to generate asyncpg connections as the 'connect' arg + async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection: + return await connector.connect_async( + instance_connection_name, + "asyncpg", + user="my-user", + password="my-password", + db="my-db", + **kwargs, # ... additional asyncpg args + ) + + # create connection pool + pool = await asyncpg.create_pool( + "my-project:my-region:my-instance", connect=getconn + ) + + # acquire connection and query Cloud SQL database + async with pool.acquire() as conn: + res = await conn.fetch("SELECT NOW()") +``` + +#### SQLAlchemy Async Engine + ```python import asyncio import asyncpg @@ -574,17 +644,17 @@ from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine from google.cloud.sql.connector import Connector async def init_connection_pool(connector: Connector) -> AsyncEngine: - # initialize Connector object for connections to Cloud SQL + # creation function to generate asyncpg connections as 'async_creator' arg async def getconn() -> asyncpg.Connection: - conn: asyncpg.Connection = await connector.connect_async( - "project:region:instance", # Cloud SQL instance connection name - "asyncpg", - user="my-user", - password="my-password", - db="my-db-name" - # ... additional database driver args - ) - return conn + conn: asyncpg.Connection = await connector.connect_async( + "project:region:instance", # Cloud SQL instance connection name + "asyncpg", + user="my-user", + password="my-password", + db="my-db-name" + # ... additional database driver args + ) + return conn # The Cloud SQL Python Connector can be used along with SQLAlchemy using the # 'async_creator' argument to 'create_async_engine' diff --git a/setup.py b/setup.py index bdf7a27c..bb70449a 100644 --- a/setup.py +++ b/setup.py @@ -80,7 +80,7 @@ "pymysql": ["PyMySQL>=1.1.0"], "pg8000": ["pg8000>=1.31.1"], "pytds": ["python-tds>=1.15.0"], - "asyncpg": ["asyncpg>=0.29.0"], + "asyncpg": ["asyncpg>=0.30.0"], }, python_requires=">=3.9", include_package_data=True, diff --git a/tests/system/test_asyncpg_connection.py b/tests/system/test_asyncpg_connection.py index 20715c65..98a86a1a 100644 --- a/tests/system/test_asyncpg_connection.py +++ b/tests/system/test_asyncpg_connection.py @@ -16,7 +16,7 @@ import asyncio import os -from typing import Tuple +from typing import Any, Tuple import asyncpg import sqlalchemy @@ -88,7 +88,68 @@ async def getconn() -> asyncpg.Connection: return engine, connector -async def test_connection_with_asyncpg() -> None: +async def create_asyncpg_pool( + instance_connection_name: str, + user: str, + password: str, + db: str, + refresh_strategy: str = "background", +) -> Tuple[asyncpg.Pool, Connector]: + """Creates a native asyncpg connection pool for a Cloud SQL instance and + returns the pool and the connector. Callers are responsible for closing the + pool and the connector. + + A sample invocation looks like: + + pool, connector = await create_asyncpg_pool( + inst_conn_name, + user, + password, + db, + ) + async with pool.acquire() as conn: + hello = await conn.fetch("SELECT 'Hello World!'") + # do something with query result + await connector.close_async() + + Args: + instance_connection_name (str): + The instance connection name specifies the instance relative to the + project and region. For example: "my-project:my-region:my-instance" + user (str): + The database user name, e.g., postgres + password (str): + The database user's password, e.g., secret-password + db (str): + The name of the database, e.g., mydb + refresh_strategy (Optional[str]): + Refresh strategy for the Cloud SQL Connector. Can be one of "lazy" + or "background". For serverless environments use "lazy" to avoid + errors resulting from CPU being throttled. + """ + loop = asyncio.get_running_loop() + connector = Connector(loop=loop, refresh_strategy=refresh_strategy) + + async def getconn( + instance_connection_name: str, **kwargs: Any + ) -> asyncpg.Connection: + conn: asyncpg.Connection = await connector.connect_async( + instance_connection_name, + "asyncpg", + user=user, + password=password, + db=db, + ip_type="public", # can also be "private" or "psc", + **kwargs + ) + return conn + + # create native asyncpg pool (requires asyncpg version >=0.30.0) + pool = await asyncpg.create_pool(instance_connection_name, connect=getconn) + return pool, connector + + +async def test_sqlalchemy_connection_with_asyncpg() -> None: """Basic test to get time from database.""" inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"] user = os.environ["POSTGRES_USER"] @@ -104,7 +165,7 @@ async def test_connection_with_asyncpg() -> None: await connector.close_async() -async def test_lazy_connection_with_asyncpg() -> None: +async def test_lazy_sqlalchemy_connection_with_asyncpg() -> None: """Basic test to get time from database.""" inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"] user = os.environ["POSTGRES_USER"] @@ -120,3 +181,37 @@ async def test_lazy_connection_with_asyncpg() -> None: assert res[0] == 1 await connector.close_async() + + +async def test_connection_with_asyncpg() -> None: + """Basic test to get time from database.""" + inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"] + user = os.environ["POSTGRES_USER"] + password = os.environ["POSTGRES_PASS"] + db = os.environ["POSTGRES_DB"] + + pool, connector = await create_asyncpg_pool(inst_conn_name, user, password, db) + + async with pool.acquire() as conn: + res = await conn.fetch("SELECT 1") + assert res[0][0] == 1 + + await connector.close_async() + + +async def test_lazy_connection_with_asyncpg() -> None: + """Basic test to get time from database.""" + inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"] + user = os.environ["POSTGRES_USER"] + password = os.environ["POSTGRES_PASS"] + db = os.environ["POSTGRES_DB"] + + pool, connector = await create_asyncpg_pool( + inst_conn_name, user, password, db, "lazy" + ) + + async with pool.acquire() as conn: + res = await conn.fetch("SELECT 1") + assert res[0][0] == 1 + + await connector.close_async()