Skip to content

Commit

Permalink
ext/psycopg2: Implement BaseInstrumentor interface (#694)
Browse files Browse the repository at this point in the history
- Implemented BaseInstrumentor interface to enable auto-instrumentation
- Added integration tests (same tests as other db integrations)
  • Loading branch information
cnnradams authored May 21, 2020
1 parent 9e58b8a commit 2d9c8df
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 128 deletions.
79 changes: 37 additions & 42 deletions ext/opentelemetry-ext-dbapi/src/opentelemetry/ext/dbapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def instrument_connection(
connection_attributes=connection_attributes,
)
db_integration.get_connection_attributes(connection)
return TracedConnectionProxy(connection, db_integration)
return get_traced_connection_proxy(connection, db_integration)


def uninstrument_connection(connection):
Expand Down Expand Up @@ -227,7 +227,7 @@ def wrapped_connection(
"""
connection = connect_method(*args, **kwargs)
self.get_connection_attributes(connection)
return TracedConnectionProxy(connection, self)
return get_traced_connection_proxy(connection, self)

def get_connection_attributes(self, connection):
# Populate span fields using connection
Expand Down Expand Up @@ -260,23 +260,21 @@ def get_connection_attributes(self, connection):
self.span_attributes["net.peer.port"] = port


# pylint: disable=abstract-method
class TracedConnectionProxy(wrapt.ObjectProxy):
# pylint: disable=unused-argument
def __init__(
self,
connection,
db_api_integration: DatabaseApiIntegration,
*args,
**kwargs
):
wrapt.ObjectProxy.__init__(self, connection)
self._db_api_integration = db_api_integration
def get_traced_connection_proxy(
connection, db_api_integration, *args, **kwargs
):
# pylint: disable=abstract-method
class TracedConnectionProxy(wrapt.ObjectProxy):
# pylint: disable=unused-argument
def __init__(self, connection, *args, **kwargs):
wrapt.ObjectProxy.__init__(self, connection)

def cursor(self, *args, **kwargs):
return get_traced_cursor_proxy(
self.__wrapped__.cursor(*args, **kwargs), db_api_integration
)

def cursor(self, *args, **kwargs):
return TracedCursorProxy(
self.__wrapped__.cursor(*args, **kwargs), self._db_api_integration
)
return TracedConnectionProxy(connection, *args, **kwargs)


class TracedCursor:
Expand Down Expand Up @@ -323,31 +321,28 @@ def traced_execution(
raise ex


# pylint: disable=abstract-method
class TracedCursorProxy(wrapt.ObjectProxy):
def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs):
_traced_cursor = TracedCursor(db_api_integration)
# pylint: disable=abstract-method
class TracedCursorProxy(wrapt.ObjectProxy):

# pylint: disable=unused-argument
def __init__(
self,
cursor,
db_api_integration: DatabaseApiIntegration,
*args,
**kwargs
):
wrapt.ObjectProxy.__init__(self, cursor)
self._traced_cursor = TracedCursor(db_api_integration)
# pylint: disable=unused-argument
def __init__(self, cursor, *args, **kwargs):
wrapt.ObjectProxy.__init__(self, cursor)

def execute(self, *args, **kwargs):
return self._traced_cursor.traced_execution(
self.__wrapped__.execute, *args, **kwargs
)
def execute(self, *args, **kwargs):
return _traced_cursor.traced_execution(
self.__wrapped__.execute, *args, **kwargs
)

def executemany(self, *args, **kwargs):
return self._traced_cursor.traced_execution(
self.__wrapped__.executemany, *args, **kwargs
)
def executemany(self, *args, **kwargs):
return _traced_cursor.traced_execution(
self.__wrapped__.executemany, *args, **kwargs
)

def callproc(self, *args, **kwargs):
return self._traced_cursor.traced_execution(
self.__wrapped__.callproc, *args, **kwargs
)
def callproc(self, *args, **kwargs):
return _traced_cursor.traced_execution(
self.__wrapped__.callproc, *args, **kwargs
)

return TracedCursorProxy(cursor, *args, **kwargs)
4 changes: 0 additions & 4 deletions ext/opentelemetry-ext-dbapi/tests/test_dbapi_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,13 @@ def test_wrap_connect(self, mock_dbapi):
dbapi.wrap_connect(self.tracer, mock_dbapi, "connect", "-")
connection = mock_dbapi.connect()
self.assertEqual(mock_dbapi.connect.call_count, 1)
self.assertIsInstance(connection, dbapi.TracedConnectionProxy)
self.assertIsInstance(connection.__wrapped__, mock.Mock)

@mock.patch("opentelemetry.ext.dbapi")
def test_unwrap_connect(self, mock_dbapi):
dbapi.wrap_connect(self.tracer, mock_dbapi, "connect", "-")
connection = mock_dbapi.connect()
self.assertEqual(mock_dbapi.connect.call_count, 1)
self.assertIsInstance(connection, dbapi.TracedConnectionProxy)

dbapi.unwrap_connect(mock_dbapi, "connect")
connection = mock_dbapi.connect()
Expand All @@ -145,7 +143,6 @@ def test_instrument_connection(self):
# Avoid get_attributes failing because can't concatenate mock
connection.database = "-"
connection2 = dbapi.instrument_connection(self.tracer, connection, "-")
self.assertIsInstance(connection2, dbapi.TracedConnectionProxy)
self.assertIs(connection2.__wrapped__, connection)

def test_uninstrument_connection(self):
Expand All @@ -154,7 +151,6 @@ def test_uninstrument_connection(self):
# be concatenated
connection.database = "-"
connection2 = dbapi.instrument_connection(self.tracer, connection, "-")
self.assertIsInstance(connection2, dbapi.TracedConnectionProxy)
self.assertIs(connection2.__wrapped__, connection)

connection3 = dbapi.uninstrument_connection(connection2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import psycopg2

from opentelemetry import trace as trace_api
from opentelemetry.ext.psycopg2 import trace_integration
from opentelemetry.ext.psycopg2 import Psycopg2Instrumentor
from opentelemetry.test.test_base import TestBase

POSTGRES_HOST = os.getenv("POSTGRESQL_HOST ", "localhost")
Expand All @@ -35,7 +35,7 @@ def setUpClass(cls):
cls._connection = None
cls._cursor = None
cls._tracer = cls.tracer_provider.get_tracer(__name__)
trace_integration(cls.tracer_provider)
Psycopg2Instrumentor().instrument(tracer_provider=cls.tracer_provider)
cls._connection = psycopg2.connect(
dbname=POSTGRES_DB_NAME,
user=POSTGRES_USER,
Expand All @@ -52,6 +52,7 @@ def tearDownClass(cls):
cls._cursor.close()
if cls._connection:
cls._connection.close()
Psycopg2Instrumentor().uninstrument()

def validate_spans(self):
spans = self.memory_exporter.get_finished_spans()
Expand Down
2 changes: 2 additions & 0 deletions ext/opentelemetry-ext-psycopg2/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- Implement instrumentor interface, enabling auto-instrumentation ([#694]https://github.com/open-telemetry/opentelemetry-python/pull/694)

## 0.4a0

Released 2020-02-21
Expand Down
10 changes: 10 additions & 0 deletions ext/opentelemetry-ext-psycopg2/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,18 @@ package_dir=
packages=find_namespace:
install_requires =
opentelemetry-api == 0.8.dev0
opentelemetry-ext-dbapi == 0.8.dev0
opentelemetry-auto-instrumentation == 0.8.dev0
psycopg2-binary >= 2.7.3.1
wrapt >= 1.0.0, < 2.0.0

[options.extras_require]
test =
opentelemetry-test == 0.8.dev0

[options.packages.find]
where = src

[options.entry_points]
opentelemetry_instrumentor =
psycopg2 = opentelemetry.ext.psycopg2:Psycopg2Instrumentor
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# limitations under the License.

"""
The integration with PostgreSQL supports the `Psycopg`_ library and is specified
to ``trace_integration`` using ``'PostgreSQL'``.
The integration with PostgreSQL supports the `Psycopg`_ library, it can be enabled by
using ``Psycopg2Instrumentor``.
.. _Psycopg: http://initd.org/psycopg/
Expand All @@ -26,11 +26,12 @@
import psycopg2
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.trace.ext.psycopg2 import trace_integration
from opentelemetry.trace.ext.psycopg2 import Psycopg2Instrumentor
trace.set_tracer_provider(TracerProvider())
trace_integration()
Psycopg2Instrumentor().instrument()
cnx = psycopg2.connect(database='Database')
cursor = cnx.cursor()
cursor.execute("INSERT INTO test (testField) VALUES (123)")
Expand All @@ -41,83 +42,77 @@
---
"""

import logging
import typing

import psycopg2
import wrapt
from psycopg2.sql import Composable

from opentelemetry.ext.dbapi import DatabaseApiIntegration, TracedCursor
from opentelemetry.auto_instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.ext import dbapi
from opentelemetry.ext.psycopg2.version import __version__
from opentelemetry.trace import Tracer, get_tracer

logger = logging.getLogger(__name__)

DATABASE_COMPONENT = "postgresql"
DATABASE_TYPE = "sql"
from opentelemetry.trace import TracerProvider, get_tracer


def trace_integration(tracer_provider=None):
"""Integrate with PostgreSQL Psycopg library.
Psycopg: http://initd.org/psycopg/
"""

tracer = get_tracer(__name__, __version__, tracer_provider)

connection_attributes = {
class Psycopg2Instrumentor(BaseInstrumentor):
_CONNECTION_ATTRIBUTES = {
"database": "info.dbname",
"port": "info.port",
"host": "info.host",
"user": "info.user",
}
db_integration = DatabaseApiIntegration(
tracer,
DATABASE_COMPONENT,
database_type=DATABASE_TYPE,
connection_attributes=connection_attributes,
)

# pylint: disable=unused-argument
def wrap_connect(
connect_func: typing.Callable[..., any],
instance: typing.Any,
args: typing.Tuple[any, any],
kwargs: typing.Dict[any, any],
):
connection = connect_func(*args, **kwargs)
db_integration.get_connection_attributes(connection)
connection.cursor_factory = PsycopgTraceCursor
return connection

try:
wrapt.wrap_function_wrapper(psycopg2, "connect", wrap_connect)
except Exception as ex: # pylint: disable=broad-except
logger.warning("Failed to integrate with pyscopg2. %s", str(ex))

class PsycopgTraceCursor(psycopg2.extensions.cursor):
def __init__(self, *args, **kwargs):
self._traced_cursor = TracedCursor(db_integration)
super(PsycopgTraceCursor, self).__init__(*args, **kwargs)

# pylint: disable=redefined-builtin
def execute(self, query, vars=None):
if isinstance(query, Composable):
query = query.as_string(self)
return self._traced_cursor.traced_execution(
super(PsycopgTraceCursor, self).execute, query, vars
)

# pylint: disable=redefined-builtin
def executemany(self, query, vars):
if isinstance(query, Composable):
query = query.as_string(self)
return self._traced_cursor.traced_execution(
super(PsycopgTraceCursor, self).executemany, query, vars
)

# pylint: disable=redefined-builtin
def callproc(self, procname, vars=None):
return self._traced_cursor.traced_execution(
super(PsycopgTraceCursor, self).callproc, procname, vars
)

_DATABASE_COMPONENT = "postgresql"
_DATABASE_TYPE = "sql"

def _instrument(self, **kwargs):
"""Integrate with PostgreSQL Psycopg library.
Psycopg: http://initd.org/psycopg/
"""

tracer_provider = kwargs.get("tracer_provider")

tracer = get_tracer(__name__, __version__, tracer_provider)

dbapi.wrap_connect(
tracer,
psycopg2,
"connect",
self._DATABASE_COMPONENT,
self._DATABASE_TYPE,
self._CONNECTION_ATTRIBUTES,
)

def _uninstrument(self, **kwargs):
""""Disable Psycopg2 instrumentation"""
dbapi.unwrap_connect(psycopg2, "connect")

# pylint:disable=no-self-use
def instrument_connection(self, connection):
"""Enable instrumentation in a Psycopg2 connection.
Args:
connection: The connection to instrument.
Returns:
An instrumented connection.
"""
tracer = get_tracer(__name__, __version__)

return dbapi.instrument_connection(
tracer,
connection,
self._DATABASE_COMPONENT,
self._DATABASE_TYPE,
self._CONNECTION_ATTRIBUTES,
)

def uninstrument_connection(self, connection):
"""Disable instrumentation in a Psycopg2 connection.
Args:
connection: The connection to uninstrument.
Returns:
An uninstrumented connection.
"""
return dbapi.uninstrument_connection(connection)
Loading

0 comments on commit 2d9c8df

Please sign in to comment.