diff --git a/tests/system/test_asyncpg_connection.py b/tests/system/test_asyncpg_connection.py index 22eda9a..5997f9c 100644 --- a/tests/system/test_asyncpg_connection.py +++ b/tests/system/test_asyncpg_connection.py @@ -13,35 +13,89 @@ # limitations under the License. import os +from typing import Tuple +# [START alloydb_sqlalchemy_connect_async_connector] import asyncpg import pytest import sqlalchemy -from sqlalchemy.ext.asyncio import create_async_engine +import sqlalchemy.ext.asyncio from google.cloud.alloydb.connector import AsyncConnector -@pytest.mark.asyncio -async def test_connection_with_asyncpg() -> None: - async with AsyncConnector() as connector: - - async def getconn() -> asyncpg.Connection: - conn: asyncpg.Connection = await connector.connect( - os.environ["ALLOYDB_INSTANCE_URI"], - "asyncpg", - user=os.environ["ALLOYDB_USER"], - password=os.environ["ALLOYDB_PASS"], - db=os.environ["ALLOYDB_DB"], - ) - return conn +async def create_sqlalchemy_engine( + inst_uri: str, + user: str, + password: str, + db: str, +) -> Tuple[sqlalchemy.ext.asyncio.engine.AsyncEngine, AsyncConnector]: + """Creates a connection pool for an AlloyDB instance and returns the pool + and the connector. Callers are responsible for closing the pool and the + connector. + + A sample invocation looks like: + + engine, connector = await create_sqlalchemy_engine( + inst_uri, + user, + password, + db, + ) + async with engine.connect() as conn: + time = await conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone() + curr_time = time[0] + # do something with query result + await connector.close() + + Args: + instance_uri (str): + The instance URI specifies the instance relative to the project, + region, and cluster. For example: + "projects/my-project/locations/us-central1/clusters/my-cluster/instances/my-instance" + user (str): + The database user name, e.g., postgres + password (str): + The database user's password, e.g., secret-password + db_name (str): + The name of the database, e.g., mydb + """ + connector = AsyncConnector() + + async def getconn() -> asyncpg.Connection: + conn: asyncpg.Connection = await connector.connect( + inst_uri, + "asyncpg", + user=user, + password=password, + db=db, + ) + return conn # create SQLAlchemy connection pool - pool = create_async_engine( + engine = sqlalchemy.ext.asyncio.create_async_engine( "postgresql+asyncpg://", async_creator=getconn, execution_options={"isolation_level": "AUTOCOMMIT"}, ) + return engine, connector + + +# [END alloydb_sqlalchemy_connect_async_connector] + + +@pytest.mark.asyncio +async def test_connection_with_asyncpg() -> None: + """Basic test to get time from database.""" + inst_uri = os.environ["ALLOYDB_INSTANCE_URI"] + user = os.environ["ALLOYDB_USER"] + password = os.environ["ALLOYDB_PASS"] + db = os.environ["ALLOYDB_DB"] + + pool, connector = await create_sqlalchemy_engine(inst_uri, user, password, db) + async with pool.connect() as conn: res = (await conn.execute(sqlalchemy.text("SELECT 1"))).fetchone() assert res[0] == 1 + + await connector.close()