Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add psycopg2 wrapper #350

Merged
merged 6 commits into from
Aug 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions iopipe/contrib/trace/auto_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,41 @@
import uuid
import wrapt

from .dbapi import AdapterProxy, ConnectionProxy, CursorProxy, table_name
from .util import ensure_utf8

Request = collections.namedtuple(
"Request", ["command", "key", "hostname", "port", "connectionName", "db", "table"]
)


def collect_psycopg2_metrics(context, trace, instance):
from psycopg2.extensions import parse_dsn

connection = instance.connection_proxy
dsn = parse_dsn(connection.dsn)

db = dsn.get("dbname")
hostname = dsn.get("host", "localhost")
port = dsn.get("port", 5432)

query = instance.query
command = query.split()[0].lower()
table = table_name(query, command)

request = Request(
command=ensure_utf8(command),
key=None,
hostname=ensure_utf8(hostname),
port=ensure_utf8(port),
connectionName=None,
db=ensure_utf8(db),
table=ensure_utf8(table),
)
request = request._asdict()
context.iopipe.mark.db_trace(trace, "postgresql", request)


def collect_pymongo_metrics(context, trace, instance, response):
from pymongo.cursor import Cursor
from pymongo.results import (
Expand Down Expand Up @@ -126,6 +154,70 @@ def wrapper(wrapped, instance, args, kwargs):
)


def patch_psycopg2(context):
"""
Monkey patches psycopg2 client, if available. Overloads the
execute method to add tracing and metrics collection.
"""

class PGCursorProxy(CursorProxy):
def execute(self, *args, **kwargs):
if not hasattr(context, "iopipe") or not hasattr(
context.iopipe, "mark"
): # pragma: no cover
self.__wrapped__.execute(*args, **kwargs)
return

id = ensure_utf8(str(uuid.uuid4()))
with context.iopipe.mark(id):
self.__wrapped__.execute(*args, **kwargs)
trace = context.iopipe.mark.measure(id)
context.iopipe.mark.delete(id)
collect_psycopg2_metrics(context, trace, self)

class PGConnectionProxy(ConnectionProxy):
def cursor(self, *args, **kwargs):
cursor = self.__wrapped__.cursor(*args, **kwargs)
return PGCursorProxy(cursor, self)

def adapt_wrapper(wrapped, instance, args, kwargs):
adapter = wrapped(*args, **kwargs)
return AdapterProxy(adapter) if hasattr(adapter, "prepare") else adapter

def connect_wrapper(wrapped, instance, args, kwargs):
connection = wrapped(*args, **kwargs)
return PGConnectionProxy(connection, args, kwargs)

def register_type_wrapper(wrapped, instance, args, kwargs):
def _extract_arguments(obj, scope=None):
return obj, scope

obj, scope = _extract_arguments(*args, **kwargs)

if scope is not None:
if isinstance(scope, wrapt.ObjectProxy):
scope = scope.__wrapped__
return wrapped(obj, scope)

return wrapped(obj)

try:
wrapt.wrap_function_wrapper("psycopg2", "connect", connect_wrapper)
except Exception: # pragma: no cover
pass
else:
wrapt.wrap_function_wrapper("psycopg2.extensions", "adapt", adapt_wrapper)
wrapt.wrap_function_wrapper(
"psycopg2.extensions", "register_type", register_type_wrapper
)
wrapt.wrap_function_wrapper(
"psycopg2._psycopg", "register_type", register_type_wrapper
)
wrapt.wrap_function_wrapper(
"psycopg2._json", "register_type", register_type_wrapper
)


def patch_redis(context):
"""
Monkey patches redis client, if available. Overloads the
Expand Down Expand Up @@ -179,6 +271,47 @@ def pipeline_wrapper(wrapped, instance, args, kwargs): # pragma: no cover
wrapt.wrap_function_wrapper(module_name, class_method, pipeline_wrapper)


def restore_psycopg2():
"""Restores psycopg2"""
try:
import psycopg2
except ImportError: # pragma: no cover
pass
else:
setattr(
psycopg2,
"connect",
getattr(psycopg2.connect, "__wrapped__", psycopg2.connect),
)
setattr(
psycopg2.extensions,
"register_type",
getattr(
psycopg2.extensions.register_type,
"__wrapped__",
psycopg2.extensions.register_type,
),
)
setattr(
psycopg2._psycopg,
"register_type",
getattr(
psycopg2._psycopg.register_type,
"__wrapped__",
psycopg2._psycopg.register_type,
),
)
setattr(
psycopg2._json,
"register_type",
getattr(
psycopg2._json.register_type,
"__wrapped__",
psycopg2._json.register_type,
),
)


def restore_pymongo():
"""Restores pymongo"""
try:
Expand Down Expand Up @@ -224,10 +357,12 @@ def patch_db_requests(context):
if not hasattr(context, "iopipe"):
return

patch_psycopg2(context)
patch_pymongo(context)
patch_redis(context)


def restore_db_requests():
restore_psycopg2()
restore_pymongo()
restore_redis()
65 changes: 65 additions & 0 deletions iopipe/contrib/trace/dbapi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import wrapt

COMMAND_KEYWORDS = {
"create": "table",
"delete": "from",
"insert": "into",
"select": "from",
"update": "update",
}


def table_name(query, command):
if command in COMMAND_KEYWORDS:
keyword = COMMAND_KEYWORDS[command]
parts = query.lower().split()

if keyword in parts:
return parts[parts.index(keyword) + 1]


class CursorProxy(wrapt.ObjectProxy):
def __init__(self, cursor, connection_proxy):
super(CursorProxy, self).__init__(cursor)

self._self_connection = connection_proxy

@property
def connection_proxy(self):
return self._self_connection

def execute(self, *args, **kwargs):
self.__wrapped__.execute(*args, **kwargs)


class ConnectionProxy(wrapt.ObjectProxy):
def __init__(self, connection, args, kwargs):
super(ConnectionProxy, self).__init__(connection)

self._self_args = args
self._self_kwargs = kwargs

def cursor(self, *args, **kwargs): # pragma: no cover
cursor = self.__wrapped__.cursor(*args, **kwargs)
return CursorProxy(cursor, self)

@property
def extract_hostname(self): # pragma: no cover
return self._self_kwargs.get("host", "localhost")

@property
def extract_dbname(self): # pragma: no cover
return self._self_kwargs.get("db", self._self_kwargs.get("database", ""))


class AdapterProxy(wrapt.ObjectProxy):
def prepare(self, *args, **kwargs): # pragma: no cover
if not args:
return self.__wrapped__.prepare(*args, **kwargs)

connection = args[0]

if isinstance(connection, wrapt.ObjectProxy):
connection = connection.__wrapped__

return self.__wrapped__.prepare(connection, *args[1:], **kwargs)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"mock",
"mongomock==3.17.0",
"more-itertools<6.0.0",
"psycopg2-binary==2.8.3",
"pymongo==3.8.0",
"pytest==4.1.0",
"pytest-benchmark==3.2.0",
Expand Down
14 changes: 14 additions & 0 deletions tests/contrib/trace/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import psycopg2
import pymongo
import pytest
import redis
Expand Down Expand Up @@ -190,3 +191,16 @@ def _handler(event, context):
db.my_collection.update_one({"x": 10}, {"$inc": {"x": 3}})

return iopipe_with_trace_auto_db, _handler


@pytest.fixture
def handler_with_trace_auto_db_psycopg2(iopipe_with_trace_auto_db):
@iopipe_with_trace_auto_db
def _handler(event, context):
conn = psycopg2.connect("postgres://username:password@localhost:5432/foobar")
cur = conn.cursor()
cur.execute("INSERT INTO test (num, data) VALUES (%s, %s)", (100, "abc'def"))
cur.execute("SELECT * FROM test;")
cur.fetchone()

return iopipe_with_trace_auto_db, _handler
12 changes: 12 additions & 0 deletions tests/contrib/trace/test_auto_db.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import psycopg2
from pymongo.collection import Collection
from redis.client import Pipeline, Redis
import wrapt

from iopipe.contrib.trace.auto_db import patch_db_requests, restore_db_requests

Expand All @@ -17,6 +19,8 @@

def test_patch_db_requests(mock_context_wrapper,):
"""Asserts that monkey patching occurs if iopipe present"""
assert not isinstance(psycopg2.connect, wrapt.ObjectProxy)

assert not hasattr(Redis.execute_command, "__wrapped__")
assert not hasattr(Pipeline.execute, "__wrapped__")
assert not hasattr(Pipeline.immediate_execute_command, "__wrapped__")
Expand All @@ -26,6 +30,8 @@ def test_patch_db_requests(mock_context_wrapper,):

patch_db_requests(mock_context_wrapper)

assert isinstance(psycopg2.connect, wrapt.ObjectProxy)

assert hasattr(Redis.execute_command, "__wrapped__")
assert hasattr(Pipeline.execute, "__wrapped__")
assert hasattr(Pipeline.immediate_execute_command, "__wrapped__")
Expand All @@ -35,6 +41,8 @@ def test_patch_db_requests(mock_context_wrapper,):

restore_db_requests()

assert not isinstance(psycopg2.connect, wrapt.ObjectProxy)

assert not hasattr(Redis.execute_command, "__wrapped__")
assert not hasattr(Pipeline.execute, "__wrapped__")
assert not hasattr(Pipeline.immediate_execute_command, "__wrapped__")
Expand All @@ -45,6 +53,8 @@ def test_patch_db_requests(mock_context_wrapper,):

def test_patch_db_requests_no_iopipe(mock_context,):
"""Asserts that monkey patching does not occur if IOpipe not present"""
assert not isinstance(psycopg2.connect, wrapt.ObjectProxy)

assert not hasattr(Redis.execute_command, "__wrapped__")
assert not hasattr(Pipeline.execute, "__wrapped__")
assert not hasattr(Pipeline.immediate_execute_command, "__wrapped__")
Expand All @@ -56,6 +66,8 @@ def test_patch_db_requests_no_iopipe(mock_context,):

patch_db_requests(mock_context)

assert not isinstance(psycopg2.connect, wrapt.ObjectProxy)

assert not hasattr(Redis.execute_command, "__wrapped__")
assert not hasattr(Pipeline.execute, "__wrapped__")
assert not hasattr(Pipeline.immediate_execute_command, "__wrapped__")
Expand Down
36 changes: 36 additions & 0 deletions tests/contrib/trace/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,39 @@ def test_trace_plugin__auto_db__pymongo(

assert db_traces[0]["request"]["command"] == "insert_one"
assert db_traces[2]["request"]["command"] == "update"


@mock.patch("psycopg2.connect")
@mock.patch("iopipe.report.send_report", autospec=True)
def test_trace_plugin__auto_db__psycopg2(
mock_send_report, mock_connect, handler_with_trace_auto_db_psycopg2, mock_context
):
mock_connect.return_value.dsn = "postgres://username:password@localhost:5432/foobar"
type(mock_connect.return_value.cursor.return_value).query = mock.PropertyMock(
side_effect=[
"INSERT INTO test (num, data) VALUES (%s, %s)",
"SELECT * FROM test",
]
)

iopipe, handler = handler_with_trace_auto_db_psycopg2

assert len(iopipe.config["plugins"]) == 1

handler({}, mock_context)

assert len(iopipe.report.performance_entries) == 0

db_traces = iopipe.report.db_trace_entries

assert len(db_traces) == 2

for db_trace in db_traces:
assert db_trace["dbType"] == "postgresql"
assert db_trace["request"]["hostname"] == "localhost"
assert db_trace["request"]["port"] == "5432"
assert db_trace["request"]["db"] == "foobar"
assert db_trace["request"]["table"] == "test"

assert db_traces[0]["request"]["command"] == "insert"
assert db_traces[1]["request"]["command"] == "select"