Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aiomysql Instrumentation #1265

Merged
merged 10 commits into from
Dec 12, 2024
3 changes: 3 additions & 0 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3166,6 +3166,9 @@ def _process_module_builtin_defaults():
_process_module_definition("MySQLdb", "newrelic.hooks.database_mysqldb", "instrument_mysqldb")
_process_module_definition("pymysql", "newrelic.hooks.database_pymysql", "instrument_pymysql")

_process_module_definition("aiomysql", "newrelic.hooks.database_aiomysql", "instrument_aiomysql")
_process_module_definition("aiomysql.pool", "newrelic.hooks.database_aiomysql", "instrument_aiomysql_pool")

_process_module_definition("pyodbc", "newrelic.hooks.database_pyodbc", "instrument_pyodbc")

_process_module_definition("pymssql", "newrelic.hooks.database_pymssql", "instrument_pymssql")
Expand Down
106 changes: 106 additions & 0 deletions newrelic/hooks/database_aiomysql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys

from newrelic.api.database_trace import register_database_client
from newrelic.api.function_trace import FunctionTrace
from newrelic.common.object_names import callable_name
from newrelic.common.object_wrapper import (
ObjectProxy,
wrap_function_wrapper,
wrap_object,
)
from newrelic.hooks.database_dbapi2_async import (
AsyncConnectionFactory as DBAPI2AsyncConnectionFactory,
)
from newrelic.hooks.database_dbapi2_async import (
AsyncConnectionWrapper as DBAPI2AsyncConnectionWrapper,
)
from newrelic.hooks.database_dbapi2_async import (
AsyncCursorWrapper as DBAPI2AsyncCursorWrapper,
)


class AsyncCursorContextManagerWrapper(ObjectProxy):

__cursor_wrapper__ = DBAPI2AsyncCursorWrapper

def __init__(self, context_manager, dbapi2_module, connect_params, cursor_args):
super().__init__(context_manager)
self._nr_dbapi2_module = dbapi2_module
self._nr_connect_params = connect_params
self._nr_cursor_args = cursor_args

async def __aenter__(self):
cursor = await self.__wrapped__.__aenter__()
return self.__cursor_wrapper__(cursor, self._nr_dbapi2_module, self._nr_connect_params, self._nr_cursor_args)

async def __aexit__(self, exc, val, tb):
return await self.__wrapped__.__aexit__(exc, val, tb)


class AsyncConnectionWrapper(DBAPI2AsyncConnectionWrapper):

__cursor_wrapper__ = AsyncCursorContextManagerWrapper


class AsyncConnectionFactory(DBAPI2AsyncConnectionFactory):

__connection_wrapper__ = AsyncConnectionWrapper


def wrap_pool__acquire(dbapi2_module):
async def _wrap_pool__acquire(wrapped, instance, args, kwargs):
rollup = ["Datastore/all", f"Datastore/{dbapi2_module._nr_database_product}/all"]

with FunctionTrace(name=callable_name(wrapped), terminal=True, rollup=rollup, source=wrapped):
connection = await wrapped(*args, **kwargs)
connection_kwargs = getattr(instance, "_conn_kwargs", {})
return AsyncConnectionWrapper(connection, dbapi2_module, (((), connection_kwargs)))

return _wrap_pool__acquire


def instance_info(args, kwargs):
def _bind_params(host=None, user=None, password=None, db=None, port=None, *args, **kwargs):
return host, port, db

host, port, db = _bind_params(*args, **kwargs)

return (host, port, db)


def instrument_aiomysql(module):
register_database_client(
module,
database_product="MySQL",
quoting_style="single+double",
explain_query="explain",
explain_stmts=("select",),
instance_info=instance_info,
)

# Only instrument the connect method directly, don't instrument
# Connection. This follows the DBAPI2 spec and what was done for
# PyMySQL which this library is based on.

wrap_object(module, "connect", AsyncConnectionFactory, (module,))


def instrument_aiomysql_pool(module):
dbapi2_module = sys.modules["aiomysql"]
if hasattr(module, "Pool"):
if hasattr(module.Pool, "_acquire"):
wrap_function_wrapper(module, "Pool._acquire", wrap_pool__acquire(dbapi2_module))
38 changes: 38 additions & 0 deletions tests/datastore_aiomysql/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from testing_support.fixture.event_loop import ( # noqa: F401; pylint: disable=W0611
event_loop as loop,
)
from testing_support.fixtures import ( # noqa: F401; pylint: disable=W0611
collector_agent_registration_fixture,
collector_available_fixture,
)

_default_settings = {
"package_reporting.enabled": False, # Turn off package reporting for testing as it causes slow downs.
"transaction_tracer.explain_threshold": 0.0,
"transaction_tracer.transaction_threshold": 0.0,
"transaction_tracer.stack_trace_threshold": 0.0,
"debug.log_data_collector_payloads": True,
"debug.record_transaction_failure": True,
"debug.log_explain_plan_queries": True,
}

collector_agent_registration = collector_agent_registration_fixture(
app_name="Python Agent Test (datastore_aiomysql)",
default_settings=_default_settings,
linked_applications=["Python Agent Test (datastore)"],
)
157 changes: 157 additions & 0 deletions tests/datastore_aiomysql/test_database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import aiomysql
from testing_support.db_settings import mysql_settings
from testing_support.util import instance_hostname
from testing_support.validators.validate_database_trace_inputs import (
validate_database_trace_inputs,
)
from testing_support.validators.validate_transaction_metrics import (
validate_transaction_metrics,
)

from newrelic.api.background_task import background_task

DB_SETTINGS = mysql_settings()[0]
TABLE_NAME = f"datastore_aiomysql_{DB_SETTINGS['namespace']}"
PROCEDURE_NAME = f"hello_{DB_SETTINGS['namespace']}"

HOST = instance_hostname(DB_SETTINGS["host"])
PORT = DB_SETTINGS["port"]


async def execute_db_calls_with_cursor(cursor):
await cursor.execute(f"""drop table if exists {TABLE_NAME}""")

await cursor.execute(f"create table {TABLE_NAME} (a integer, b real, c text)")

await cursor.executemany(
f"insert into {TABLE_NAME} values (%s, %s, %s)",
[(1, 1.0, "1.0"), (2, 2.2, "2.2"), (3, 3.3, "3.3")],
)

await cursor.execute(f"""select * from {TABLE_NAME}""")

async for _ in cursor:
pass

await cursor.execute(f"update {TABLE_NAME} set a=%s, b=%s, c=%s where a=%s", (4, 4.0, "4.0", 1))

await cursor.execute(f"""delete from {TABLE_NAME} where a=2""")
await cursor.execute(f"""drop procedure if exists {PROCEDURE_NAME}""")
await cursor.execute(
f"""CREATE PROCEDURE {PROCEDURE_NAME}()
BEGIN
SELECT 'Hello World!';
END"""
)

await cursor.callproc(PROCEDURE_NAME)


SCOPED_METRICS = [
(f"Datastore/statement/MySQL/{TABLE_NAME}/select", 1),
(f"Datastore/statement/MySQL/{TABLE_NAME}/insert", 1),
(f"Datastore/statement/MySQL/{TABLE_NAME}/update", 1),
(f"Datastore/statement/MySQL/{TABLE_NAME}/delete", 1),
("Datastore/operation/MySQL/drop", 2),
("Datastore/operation/MySQL/create", 2),
(f"Datastore/statement/MySQL/{PROCEDURE_NAME}/call", 1),
("Datastore/operation/MySQL/commit", 2),
("Datastore/operation/MySQL/rollback", 1),
]

ROLLUP_METRICS = [
("Datastore/all", 13),
("Datastore/allOther", 13),
("Datastore/MySQL/all", 13),
("Datastore/MySQL/allOther", 13),
(f"Datastore/statement/MySQL/{TABLE_NAME}/select", 1),
(f"Datastore/statement/MySQL/{TABLE_NAME}/insert", 1),
(f"Datastore/statement/MySQL/{TABLE_NAME}/update", 1),
(f"Datastore/statement/MySQL/{TABLE_NAME}/delete", 1),
("Datastore/operation/MySQL/select", 1),
("Datastore/operation/MySQL/insert", 1),
("Datastore/operation/MySQL/update", 1),
("Datastore/operation/MySQL/delete", 1),
(f"Datastore/statement/MySQL/{PROCEDURE_NAME}/call", 1),
("Datastore/operation/MySQL/call", 1),
("Datastore/operation/MySQL/drop", 2),
("Datastore/operation/MySQL/create", 2),
("Datastore/operation/MySQL/commit", 2),
("Datastore/operation/MySQL/rollback", 1),
(f"Datastore/instance/MySQL/{HOST}/{PORT}", 12),
]


@validate_transaction_metrics(
"test_database:test_execute_via_connection",
scoped_metrics=list(SCOPED_METRICS) + [("Function/aiomysql.connection:connect", 1)],
rollup_metrics=list(ROLLUP_METRICS) + [("Function/aiomysql.connection:connect", 1)],
background_task=True,
)
@validate_database_trace_inputs(sql_parameters_type=tuple)
@background_task()
def test_execute_via_connection(loop):
async def _test():
connection = await aiomysql.connect(
db=DB_SETTINGS["name"],
user=DB_SETTINGS["user"],
password=DB_SETTINGS["password"],
host=DB_SETTINGS["host"],
port=DB_SETTINGS["port"],
)

async with connection:
async with connection.cursor() as cursor:
await execute_db_calls_with_cursor(cursor)

await connection.commit()
await connection.rollback()
await connection.commit()

loop.run_until_complete(_test())


@validate_transaction_metrics(
"test_database:test_execute_via_pool",
scoped_metrics=list(SCOPED_METRICS) + [("Function/aiomysql.pool:Pool._acquire", 1)],
rollup_metrics=list(ROLLUP_METRICS) + [("Function/aiomysql.pool:Pool._acquire", 1)],
background_task=True,
)
@validate_database_trace_inputs(sql_parameters_type=tuple)
@background_task()
def test_execute_via_pool(loop):
async def _test():
pool = await aiomysql.create_pool(
db=DB_SETTINGS["name"],
user=DB_SETTINGS["user"],
password=DB_SETTINGS["password"],
host=DB_SETTINGS["host"],
port=DB_SETTINGS["port"],
loop=loop,
)
async with pool.acquire() as connection:
async with connection.cursor() as cursor:
await execute_db_calls_with_cursor(cursor)

await connection.commit()
await connection.rollback()
await connection.commit()

pool.close()
await pool.wait_closed()

loop.run_until_complete(_test())
6 changes: 5 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ envlist =
mongodb8-datastore_motor-{py37,py38,py39,py310,py311,py312,py313}-motorlatest,
mongodb3-datastore_pymongo-{py37,py38,py39,py310,py311,py312}-pymongo03,
mongodb8-datastore_pymongo-{py37,py38,py39,py310,py311,py312,py313,pypy310}-pymongo04,
mysql-datastore_aiomysql-{py37,py38,py39,py310,py311,py312,py313,pypy310},
mssql-datastore_pymssql-{py37,py38,py39,py310,py311,py312,py313},
mysql-datastore_mysql-mysqllatest-{py37,py38,py39,py310,py311,py312,py313},
mysql-datastore_pymysql-{py37,py38,py39,py310,py311,py312,py313,pypy310},
Expand Down Expand Up @@ -251,6 +252,8 @@ deps =
cross_agent: requests
datastore_asyncpg: asyncpg
datastore_aiomcache: aiomcache
datastore_aiomysql: aiomysql
datastore_aiomysql: cryptography
datastore_bmemcached: python-binary-memcached
datastore_elasticsearch: requests
datastore_elasticsearch-elasticsearch07: elasticsearch<8.0
Expand Down Expand Up @@ -466,9 +469,10 @@ changedir =
component_tastypie: tests/component_tastypie
coroutines_asyncio: tests/coroutines_asyncio
cross_agent: tests/cross_agent
datastore_aiomcache: tests/datastore_aiomcache
datastore_aiomysql: tests/datastore_aiomysql
datastore_asyncpg: tests/datastore_asyncpg
datastore_bmemcached: tests/datastore_bmemcached
datastore_aiomcache: tests/datastore_aiomcache
datastore_elasticsearch: tests/datastore_elasticsearch
datastore_firestore: tests/datastore_firestore
datastore_memcache: tests/datastore_memcache
Expand Down
Loading