Skip to content

Commit

Permalink
Add psycopg2 wrapper (#350)
Browse files Browse the repository at this point in the history
* Add psycopg2 wrapper

Closes #344

* Add psycopg2 metric collection

* Adding tests

* Use psycopg2-binary

* Add pragma for overridden methods

* Update tests to assert trace values
  • Loading branch information
kolanos authored Aug 16, 2019
1 parent ad4538e commit 58f1969
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 0 deletions.
135 changes: 135 additions & 0 deletions iopipe/contrib/trace/auto_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,41 @@
import uuid
import wrapt

from .dbapi import AdapterProxy, ConnectionProxy, CursorProxy, table_name
from .util import ensure_utf8

Request = collections.namedtuple(
"Request", ["command", "key", "hostname", "port", "connectionName", "db", "table"]
)


def collect_psycopg2_metrics(context, trace, instance):
from psycopg2.extensions import parse_dsn

connection = instance.connection_proxy
dsn = parse_dsn(connection.dsn)

db = dsn.get("dbname")
hostname = dsn.get("host", "localhost")
port = dsn.get("port", 5432)

query = instance.query
command = query.split()[0].lower()
table = table_name(query, command)

request = Request(
command=ensure_utf8(command),
key=None,
hostname=ensure_utf8(hostname),
port=ensure_utf8(port),
connectionName=None,
db=ensure_utf8(db),
table=ensure_utf8(table),
)
request = request._asdict()
context.iopipe.mark.db_trace(trace, "postgresql", request)


def collect_pymongo_metrics(context, trace, instance, response):
from pymongo.cursor import Cursor
from pymongo.results import (
Expand Down Expand Up @@ -126,6 +154,70 @@ def wrapper(wrapped, instance, args, kwargs):
)


def patch_psycopg2(context):
"""
Monkey patches psycopg2 client, if available. Overloads the
execute method to add tracing and metrics collection.
"""

class PGCursorProxy(CursorProxy):
def execute(self, *args, **kwargs):
if not hasattr(context, "iopipe") or not hasattr(
context.iopipe, "mark"
): # pragma: no cover
self.__wrapped__.execute(*args, **kwargs)
return

id = ensure_utf8(str(uuid.uuid4()))
with context.iopipe.mark(id):
self.__wrapped__.execute(*args, **kwargs)
trace = context.iopipe.mark.measure(id)
context.iopipe.mark.delete(id)
collect_psycopg2_metrics(context, trace, self)

class PGConnectionProxy(ConnectionProxy):
def cursor(self, *args, **kwargs):
cursor = self.__wrapped__.cursor(*args, **kwargs)
return PGCursorProxy(cursor, self)

def adapt_wrapper(wrapped, instance, args, kwargs):
adapter = wrapped(*args, **kwargs)
return AdapterProxy(adapter) if hasattr(adapter, "prepare") else adapter

def connect_wrapper(wrapped, instance, args, kwargs):
connection = wrapped(*args, **kwargs)
return PGConnectionProxy(connection, args, kwargs)

def register_type_wrapper(wrapped, instance, args, kwargs):
def _extract_arguments(obj, scope=None):
return obj, scope

obj, scope = _extract_arguments(*args, **kwargs)

if scope is not None:
if isinstance(scope, wrapt.ObjectProxy):
scope = scope.__wrapped__
return wrapped(obj, scope)

return wrapped(obj)

try:
wrapt.wrap_function_wrapper("psycopg2", "connect", connect_wrapper)
except Exception: # pragma: no cover
pass
else:
wrapt.wrap_function_wrapper("psycopg2.extensions", "adapt", adapt_wrapper)
wrapt.wrap_function_wrapper(
"psycopg2.extensions", "register_type", register_type_wrapper
)
wrapt.wrap_function_wrapper(
"psycopg2._psycopg", "register_type", register_type_wrapper
)
wrapt.wrap_function_wrapper(
"psycopg2._json", "register_type", register_type_wrapper
)


def patch_redis(context):
"""
Monkey patches redis client, if available. Overloads the
Expand Down Expand Up @@ -179,6 +271,47 @@ def pipeline_wrapper(wrapped, instance, args, kwargs): # pragma: no cover
wrapt.wrap_function_wrapper(module_name, class_method, pipeline_wrapper)


def restore_psycopg2():
"""Restores psycopg2"""
try:
import psycopg2
except ImportError: # pragma: no cover
pass
else:
setattr(
psycopg2,
"connect",
getattr(psycopg2.connect, "__wrapped__", psycopg2.connect),
)
setattr(
psycopg2.extensions,
"register_type",
getattr(
psycopg2.extensions.register_type,
"__wrapped__",
psycopg2.extensions.register_type,
),
)
setattr(
psycopg2._psycopg,
"register_type",
getattr(
psycopg2._psycopg.register_type,
"__wrapped__",
psycopg2._psycopg.register_type,
),
)
setattr(
psycopg2._json,
"register_type",
getattr(
psycopg2._json.register_type,
"__wrapped__",
psycopg2._json.register_type,
),
)


def restore_pymongo():
"""Restores pymongo"""
try:
Expand Down Expand Up @@ -224,10 +357,12 @@ def patch_db_requests(context):
if not hasattr(context, "iopipe"):
return

patch_psycopg2(context)
patch_pymongo(context)
patch_redis(context)


def restore_db_requests():
restore_psycopg2()
restore_pymongo()
restore_redis()
65 changes: 65 additions & 0 deletions iopipe/contrib/trace/dbapi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import wrapt

COMMAND_KEYWORDS = {
"create": "table",
"delete": "from",
"insert": "into",
"select": "from",
"update": "update",
}


def table_name(query, command):
if command in COMMAND_KEYWORDS:
keyword = COMMAND_KEYWORDS[command]
parts = query.lower().split()

if keyword in parts:
return parts[parts.index(keyword) + 1]


class CursorProxy(wrapt.ObjectProxy):
def __init__(self, cursor, connection_proxy):
super(CursorProxy, self).__init__(cursor)

self._self_connection = connection_proxy

@property
def connection_proxy(self):
return self._self_connection

def execute(self, *args, **kwargs):
self.__wrapped__.execute(*args, **kwargs)


class ConnectionProxy(wrapt.ObjectProxy):
def __init__(self, connection, args, kwargs):
super(ConnectionProxy, self).__init__(connection)

self._self_args = args
self._self_kwargs = kwargs

def cursor(self, *args, **kwargs): # pragma: no cover
cursor = self.__wrapped__.cursor(*args, **kwargs)
return CursorProxy(cursor, self)

@property
def extract_hostname(self): # pragma: no cover
return self._self_kwargs.get("host", "localhost")

@property
def extract_dbname(self): # pragma: no cover
return self._self_kwargs.get("db", self._self_kwargs.get("database", ""))


class AdapterProxy(wrapt.ObjectProxy):
def prepare(self, *args, **kwargs): # pragma: no cover
if not args:
return self.__wrapped__.prepare(*args, **kwargs)

connection = args[0]

if isinstance(connection, wrapt.ObjectProxy):
connection = connection.__wrapped__

return self.__wrapped__.prepare(connection, *args[1:], **kwargs)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"mock",
"mongomock==3.17.0",
"more-itertools<6.0.0",
"psycopg2-binary==2.8.3",
"pymongo==3.8.0",
"pytest==4.1.0",
"pytest-benchmark==3.2.0",
Expand Down
14 changes: 14 additions & 0 deletions tests/contrib/trace/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import psycopg2
import pymongo
import pytest
import redis
Expand Down Expand Up @@ -190,3 +191,16 @@ def _handler(event, context):
db.my_collection.update_one({"x": 10}, {"$inc": {"x": 3}})

return iopipe_with_trace_auto_db, _handler


@pytest.fixture
def handler_with_trace_auto_db_psycopg2(iopipe_with_trace_auto_db):
@iopipe_with_trace_auto_db
def _handler(event, context):
conn = psycopg2.connect("postgres://username:password@localhost:5432/foobar")
cur = conn.cursor()
cur.execute("INSERT INTO test (num, data) VALUES (%s, %s)", (100, "abc'def"))
cur.execute("SELECT * FROM test;")
cur.fetchone()

return iopipe_with_trace_auto_db, _handler
12 changes: 12 additions & 0 deletions tests/contrib/trace/test_auto_db.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import psycopg2
from pymongo.collection import Collection
from redis.client import Pipeline, Redis
import wrapt

from iopipe.contrib.trace.auto_db import patch_db_requests, restore_db_requests

Expand All @@ -17,6 +19,8 @@

def test_patch_db_requests(mock_context_wrapper,):
"""Asserts that monkey patching occurs if iopipe present"""
assert not isinstance(psycopg2.connect, wrapt.ObjectProxy)

assert not hasattr(Redis.execute_command, "__wrapped__")
assert not hasattr(Pipeline.execute, "__wrapped__")
assert not hasattr(Pipeline.immediate_execute_command, "__wrapped__")
Expand All @@ -26,6 +30,8 @@ def test_patch_db_requests(mock_context_wrapper,):

patch_db_requests(mock_context_wrapper)

assert isinstance(psycopg2.connect, wrapt.ObjectProxy)

assert hasattr(Redis.execute_command, "__wrapped__")
assert hasattr(Pipeline.execute, "__wrapped__")
assert hasattr(Pipeline.immediate_execute_command, "__wrapped__")
Expand All @@ -35,6 +41,8 @@ def test_patch_db_requests(mock_context_wrapper,):

restore_db_requests()

assert not isinstance(psycopg2.connect, wrapt.ObjectProxy)

assert not hasattr(Redis.execute_command, "__wrapped__")
assert not hasattr(Pipeline.execute, "__wrapped__")
assert not hasattr(Pipeline.immediate_execute_command, "__wrapped__")
Expand All @@ -45,6 +53,8 @@ def test_patch_db_requests(mock_context_wrapper,):

def test_patch_db_requests_no_iopipe(mock_context,):
"""Asserts that monkey patching does not occur if IOpipe not present"""
assert not isinstance(psycopg2.connect, wrapt.ObjectProxy)

assert not hasattr(Redis.execute_command, "__wrapped__")
assert not hasattr(Pipeline.execute, "__wrapped__")
assert not hasattr(Pipeline.immediate_execute_command, "__wrapped__")
Expand All @@ -56,6 +66,8 @@ def test_patch_db_requests_no_iopipe(mock_context,):

patch_db_requests(mock_context)

assert not isinstance(psycopg2.connect, wrapt.ObjectProxy)

assert not hasattr(Redis.execute_command, "__wrapped__")
assert not hasattr(Pipeline.execute, "__wrapped__")
assert not hasattr(Pipeline.immediate_execute_command, "__wrapped__")
Expand Down
36 changes: 36 additions & 0 deletions tests/contrib/trace/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,39 @@ def test_trace_plugin__auto_db__pymongo(

assert db_traces[0]["request"]["command"] == "insert_one"
assert db_traces[2]["request"]["command"] == "update"


@mock.patch("psycopg2.connect")
@mock.patch("iopipe.report.send_report", autospec=True)
def test_trace_plugin__auto_db__psycopg2(
mock_send_report, mock_connect, handler_with_trace_auto_db_psycopg2, mock_context
):
mock_connect.return_value.dsn = "postgres://username:password@localhost:5432/foobar"
type(mock_connect.return_value.cursor.return_value).query = mock.PropertyMock(
side_effect=[
"INSERT INTO test (num, data) VALUES (%s, %s)",
"SELECT * FROM test",
]
)

iopipe, handler = handler_with_trace_auto_db_psycopg2

assert len(iopipe.config["plugins"]) == 1

handler({}, mock_context)

assert len(iopipe.report.performance_entries) == 0

db_traces = iopipe.report.db_trace_entries

assert len(db_traces) == 2

for db_trace in db_traces:
assert db_trace["dbType"] == "postgresql"
assert db_trace["request"]["hostname"] == "localhost"
assert db_trace["request"]["port"] == "5432"
assert db_trace["request"]["db"] == "foobar"
assert db_trace["request"]["table"] == "test"

assert db_traces[0]["request"]["command"] == "insert"
assert db_traces[1]["request"]["command"] == "select"

0 comments on commit 58f1969

Please sign in to comment.