diff --git a/iopipe/contrib/trace/auto_db.py b/iopipe/contrib/trace/auto_db.py index 04be4907..16a2e02d 100644 --- a/iopipe/contrib/trace/auto_db.py +++ b/iopipe/contrib/trace/auto_db.py @@ -2,6 +2,7 @@ import uuid import wrapt +from .dbapi import AdapterProxy, ConnectionProxy, CursorProxy, table_name from .util import ensure_utf8 Request = collections.namedtuple( @@ -9,6 +10,33 @@ ) +def collect_psycopg2_metrics(context, trace, instance): + from psycopg2.extensions import parse_dsn + + connection = instance.connection_proxy + dsn = parse_dsn(connection.dsn) + + db = dsn.get("dbname") + hostname = dsn.get("host", "localhost") + port = dsn.get("port", 5432) + + query = instance.query + command = query.split()[0].lower() + table = table_name(query, command) + + request = Request( + command=ensure_utf8(command), + key=None, + hostname=ensure_utf8(hostname), + port=ensure_utf8(port), + connectionName=None, + db=ensure_utf8(db), + table=ensure_utf8(table), + ) + request = request._asdict() + context.iopipe.mark.db_trace(trace, "postgresql", request) + + def collect_pymongo_metrics(context, trace, instance, response): from pymongo.cursor import Cursor from pymongo.results import ( @@ -126,6 +154,70 @@ def wrapper(wrapped, instance, args, kwargs): ) +def patch_psycopg2(context): + """ + Monkey patches psycopg2 client, if available. Overloads the + execute method to add tracing and metrics collection. + """ + + class PGCursorProxy(CursorProxy): + def execute(self, *args, **kwargs): + if not hasattr(context, "iopipe") or not hasattr( + context.iopipe, "mark" + ): # pragma: no cover + self.__wrapped__.execute(*args, **kwargs) + return + + id = ensure_utf8(str(uuid.uuid4())) + with context.iopipe.mark(id): + self.__wrapped__.execute(*args, **kwargs) + trace = context.iopipe.mark.measure(id) + context.iopipe.mark.delete(id) + collect_psycopg2_metrics(context, trace, self) + + class PGConnectionProxy(ConnectionProxy): + def cursor(self, *args, **kwargs): + cursor = self.__wrapped__.cursor(*args, **kwargs) + return PGCursorProxy(cursor, self) + + def adapt_wrapper(wrapped, instance, args, kwargs): + adapter = wrapped(*args, **kwargs) + return AdapterProxy(adapter) if hasattr(adapter, "prepare") else adapter + + def connect_wrapper(wrapped, instance, args, kwargs): + connection = wrapped(*args, **kwargs) + return PGConnectionProxy(connection, args, kwargs) + + def register_type_wrapper(wrapped, instance, args, kwargs): + def _extract_arguments(obj, scope=None): + return obj, scope + + obj, scope = _extract_arguments(*args, **kwargs) + + if scope is not None: + if isinstance(scope, wrapt.ObjectProxy): + scope = scope.__wrapped__ + return wrapped(obj, scope) + + return wrapped(obj) + + try: + wrapt.wrap_function_wrapper("psycopg2", "connect", connect_wrapper) + except Exception: # pragma: no cover + pass + else: + wrapt.wrap_function_wrapper("psycopg2.extensions", "adapt", adapt_wrapper) + wrapt.wrap_function_wrapper( + "psycopg2.extensions", "register_type", register_type_wrapper + ) + wrapt.wrap_function_wrapper( + "psycopg2._psycopg", "register_type", register_type_wrapper + ) + wrapt.wrap_function_wrapper( + "psycopg2._json", "register_type", register_type_wrapper + ) + + def patch_redis(context): """ Monkey patches redis client, if available. Overloads the @@ -179,6 +271,47 @@ def pipeline_wrapper(wrapped, instance, args, kwargs): # pragma: no cover wrapt.wrap_function_wrapper(module_name, class_method, pipeline_wrapper) +def restore_psycopg2(): + """Restores psycopg2""" + try: + import psycopg2 + except ImportError: # pragma: no cover + pass + else: + setattr( + psycopg2, + "connect", + getattr(psycopg2.connect, "__wrapped__", psycopg2.connect), + ) + setattr( + psycopg2.extensions, + "register_type", + getattr( + psycopg2.extensions.register_type, + "__wrapped__", + psycopg2.extensions.register_type, + ), + ) + setattr( + psycopg2._psycopg, + "register_type", + getattr( + psycopg2._psycopg.register_type, + "__wrapped__", + psycopg2._psycopg.register_type, + ), + ) + setattr( + psycopg2._json, + "register_type", + getattr( + psycopg2._json.register_type, + "__wrapped__", + psycopg2._json.register_type, + ), + ) + + def restore_pymongo(): """Restores pymongo""" try: @@ -224,10 +357,12 @@ def patch_db_requests(context): if not hasattr(context, "iopipe"): return + patch_psycopg2(context) patch_pymongo(context) patch_redis(context) def restore_db_requests(): + restore_psycopg2() restore_pymongo() restore_redis() diff --git a/iopipe/contrib/trace/dbapi.py b/iopipe/contrib/trace/dbapi.py new file mode 100644 index 00000000..29023cba --- /dev/null +++ b/iopipe/contrib/trace/dbapi.py @@ -0,0 +1,65 @@ +import wrapt + +COMMAND_KEYWORDS = { + "create": "table", + "delete": "from", + "insert": "into", + "select": "from", + "update": "update", +} + + +def table_name(query, command): + if command in COMMAND_KEYWORDS: + keyword = COMMAND_KEYWORDS[command] + parts = query.lower().split() + + if keyword in parts: + return parts[parts.index(keyword) + 1] + + +class CursorProxy(wrapt.ObjectProxy): + def __init__(self, cursor, connection_proxy): + super(CursorProxy, self).__init__(cursor) + + self._self_connection = connection_proxy + + @property + def connection_proxy(self): + return self._self_connection + + def execute(self, *args, **kwargs): + self.__wrapped__.execute(*args, **kwargs) + + +class ConnectionProxy(wrapt.ObjectProxy): + def __init__(self, connection, args, kwargs): + super(ConnectionProxy, self).__init__(connection) + + self._self_args = args + self._self_kwargs = kwargs + + def cursor(self, *args, **kwargs): # pragma: no cover + cursor = self.__wrapped__.cursor(*args, **kwargs) + return CursorProxy(cursor, self) + + @property + def extract_hostname(self): # pragma: no cover + return self._self_kwargs.get("host", "localhost") + + @property + def extract_dbname(self): # pragma: no cover + return self._self_kwargs.get("db", self._self_kwargs.get("database", "")) + + +class AdapterProxy(wrapt.ObjectProxy): + def prepare(self, *args, **kwargs): # pragma: no cover + if not args: + return self.__wrapped__.prepare(*args, **kwargs) + + connection = args[0] + + if isinstance(connection, wrapt.ObjectProxy): + connection = connection.__wrapped__ + + return self.__wrapped__.prepare(connection, *args[1:], **kwargs) diff --git a/setup.py b/setup.py index aa36e277..fdd9b2c4 100644 --- a/setup.py +++ b/setup.py @@ -21,6 +21,7 @@ "mock", "mongomock==3.17.0", "more-itertools<6.0.0", + "psycopg2-binary==2.8.3", "pymongo==3.8.0", "pytest==4.1.0", "pytest-benchmark==3.2.0", diff --git a/tests/contrib/trace/conftest.py b/tests/contrib/trace/conftest.py index 804c967c..09b49ba7 100644 --- a/tests/contrib/trace/conftest.py +++ b/tests/contrib/trace/conftest.py @@ -1,3 +1,4 @@ +import psycopg2 import pymongo import pytest import redis @@ -190,3 +191,16 @@ def _handler(event, context): db.my_collection.update_one({"x": 10}, {"$inc": {"x": 3}}) return iopipe_with_trace_auto_db, _handler + + +@pytest.fixture +def handler_with_trace_auto_db_psycopg2(iopipe_with_trace_auto_db): + @iopipe_with_trace_auto_db + def _handler(event, context): + conn = psycopg2.connect("postgres://username:password@localhost:5432/foobar") + cur = conn.cursor() + cur.execute("INSERT INTO test (num, data) VALUES (%s, %s)", (100, "abc'def")) + cur.execute("SELECT * FROM test;") + cur.fetchone() + + return iopipe_with_trace_auto_db, _handler diff --git a/tests/contrib/trace/test_auto_db.py b/tests/contrib/trace/test_auto_db.py index ec12f150..6933aca8 100644 --- a/tests/contrib/trace/test_auto_db.py +++ b/tests/contrib/trace/test_auto_db.py @@ -1,5 +1,7 @@ +import psycopg2 from pymongo.collection import Collection from redis.client import Pipeline, Redis +import wrapt from iopipe.contrib.trace.auto_db import patch_db_requests, restore_db_requests @@ -17,6 +19,8 @@ def test_patch_db_requests(mock_context_wrapper,): """Asserts that monkey patching occurs if iopipe present""" + assert not isinstance(psycopg2.connect, wrapt.ObjectProxy) + assert not hasattr(Redis.execute_command, "__wrapped__") assert not hasattr(Pipeline.execute, "__wrapped__") assert not hasattr(Pipeline.immediate_execute_command, "__wrapped__") @@ -26,6 +30,8 @@ def test_patch_db_requests(mock_context_wrapper,): patch_db_requests(mock_context_wrapper) + assert isinstance(psycopg2.connect, wrapt.ObjectProxy) + assert hasattr(Redis.execute_command, "__wrapped__") assert hasattr(Pipeline.execute, "__wrapped__") assert hasattr(Pipeline.immediate_execute_command, "__wrapped__") @@ -35,6 +41,8 @@ def test_patch_db_requests(mock_context_wrapper,): restore_db_requests() + assert not isinstance(psycopg2.connect, wrapt.ObjectProxy) + assert not hasattr(Redis.execute_command, "__wrapped__") assert not hasattr(Pipeline.execute, "__wrapped__") assert not hasattr(Pipeline.immediate_execute_command, "__wrapped__") @@ -45,6 +53,8 @@ def test_patch_db_requests(mock_context_wrapper,): def test_patch_db_requests_no_iopipe(mock_context,): """Asserts that monkey patching does not occur if IOpipe not present""" + assert not isinstance(psycopg2.connect, wrapt.ObjectProxy) + assert not hasattr(Redis.execute_command, "__wrapped__") assert not hasattr(Pipeline.execute, "__wrapped__") assert not hasattr(Pipeline.immediate_execute_command, "__wrapped__") @@ -56,6 +66,8 @@ def test_patch_db_requests_no_iopipe(mock_context,): patch_db_requests(mock_context) + assert not isinstance(psycopg2.connect, wrapt.ObjectProxy) + assert not hasattr(Redis.execute_command, "__wrapped__") assert not hasattr(Pipeline.execute, "__wrapped__") assert not hasattr(Pipeline.immediate_execute_command, "__wrapped__") diff --git a/tests/contrib/trace/test_plugin.py b/tests/contrib/trace/test_plugin.py index 212e0221..581cc9d4 100644 --- a/tests/contrib/trace/test_plugin.py +++ b/tests/contrib/trace/test_plugin.py @@ -275,3 +275,39 @@ def test_trace_plugin__auto_db__pymongo( assert db_traces[0]["request"]["command"] == "insert_one" assert db_traces[2]["request"]["command"] == "update" + + +@mock.patch("psycopg2.connect") +@mock.patch("iopipe.report.send_report", autospec=True) +def test_trace_plugin__auto_db__psycopg2( + mock_send_report, mock_connect, handler_with_trace_auto_db_psycopg2, mock_context +): + mock_connect.return_value.dsn = "postgres://username:password@localhost:5432/foobar" + type(mock_connect.return_value.cursor.return_value).query = mock.PropertyMock( + side_effect=[ + "INSERT INTO test (num, data) VALUES (%s, %s)", + "SELECT * FROM test", + ] + ) + + iopipe, handler = handler_with_trace_auto_db_psycopg2 + + assert len(iopipe.config["plugins"]) == 1 + + handler({}, mock_context) + + assert len(iopipe.report.performance_entries) == 0 + + db_traces = iopipe.report.db_trace_entries + + assert len(db_traces) == 2 + + for db_trace in db_traces: + assert db_trace["dbType"] == "postgresql" + assert db_trace["request"]["hostname"] == "localhost" + assert db_trace["request"]["port"] == "5432" + assert db_trace["request"]["db"] == "foobar" + assert db_trace["request"]["table"] == "test" + + assert db_traces[0]["request"]["command"] == "insert" + assert db_traces[1]["request"]["command"] == "select"