Skip to content

Commit

Permalink
Use a weak reference to sqlalchemy Engine to avoid memory leak (#1771)
Browse files Browse the repository at this point in the history
* Use a weak reference to sqlalchemy Engine to avoid memory leak

Closes #1761

By using a weak reference to the `Engine` object, we can avoid the memory leak as disposed `Engines` get properly deallocated. Whenever `SQLAlchemy` is uninstrumented, we only trigger a removal for those event listeners which are listening for objects that haven't been garbage-collected yet.

* Made a mistake in resolving the weak reference

* Fixed formatting issues

* Updated changelog

* Added unit test to check that engine was garbage collected

* Do not save engine in EngineTracer to avoid memory leak

* Add an empty line to satisfy black formatter

* Fix isort complaints

* Fixed the issue when pool name is not set and =None

* Fix formatting issue

* Rebased after changes in a recent commit

* Updated PR number in changelog

---------

Co-authored-by: Shalev Roda <65566801+shalevr@users.noreply.github.com>
  • Loading branch information
rbagd and shalevr authored Jun 27, 2023
1 parent a45c9c3 commit 2e49ba1
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 18 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `opentelemetry-instrumentation-system-metrics` Add `process.` prefix to `runtime.memory`, `runtime.cpu.time`, and `runtime.gc_count`. Change `runtime.memory` from count to UpDownCounter. ([#1735](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1735))
- Add request and response hooks for GRPC instrumentation (client only)
([#1706](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1706))
- Fix memory leak in SQLAlchemy instrumentation where disposed `Engine` does not get garbage collected
([#1771](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1771)
- `opentelemetry-instrumentation-pymemcache` Update instrumentation to support pymemcache >4
([#1764](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1764))
- `opentelemetry-instrumentation-confluent-kafka` Add support for higher versions of confluent_kafka
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
import os
import re
import weakref

from sqlalchemy.event import ( # pylint: disable=no-name-in-module
listen,
Expand Down Expand Up @@ -99,11 +100,11 @@ def __init__(
commenter_options=None,
):
self.tracer = tracer
self.engine = engine
self.connections_usage = connections_usage
self.vendor = _normalize_vendor(engine.name)
self.enable_commenter = enable_commenter
self.commenter_options = commenter_options if commenter_options else {}
self._engine_attrs = _get_attributes_from_engine(engine)
self._leading_comment_remover = re.compile(r"^/\*.*?\*/")

self._register_event_listener(
Expand All @@ -118,23 +119,11 @@ def __init__(
self._register_event_listener(engine, "checkin", self._pool_checkin)
self._register_event_listener(engine, "checkout", self._pool_checkout)

def _get_connection_string(self):
drivername = self.engine.url.drivername or ""
host = self.engine.url.host or ""
port = self.engine.url.port or ""
database = self.engine.url.database or ""
return f"{drivername}://{host}:{port}/{database}"

def _get_pool_name(self):
if self.engine.pool.logging_name is not None:
return self.engine.pool.logging_name
return self._get_connection_string()

def _add_idle_to_connection_usage(self, value):
self.connections_usage.add(
value,
attributes={
"pool.name": self._get_pool_name(),
**self._engine_attrs,
"state": "idle",
},
)
Expand All @@ -143,7 +132,7 @@ def _add_used_to_connection_usage(self, value):
self.connections_usage.add(
value,
attributes={
"pool.name": self._get_pool_name(),
**self._engine_attrs,
"state": "used",
},
)
Expand All @@ -169,12 +158,21 @@ def _pool_checkout(
@classmethod
def _register_event_listener(cls, target, identifier, func, *args, **kw):
listen(target, identifier, func, *args, **kw)
cls._remove_event_listener_params.append((target, identifier, func))
cls._remove_event_listener_params.append(
(weakref.ref(target), identifier, func)
)

@classmethod
def remove_all_event_listeners(cls):
for remove_params in cls._remove_event_listener_params:
remove(*remove_params)
for (
weak_ref_target,
identifier,
func,
) in cls._remove_event_listener_params:
# Remove an event listener only if saved weak reference points to an object
# which has not been garbage collected
if weak_ref_target() is not None:
remove(weak_ref_target(), identifier, func)
cls._remove_event_listener_params.clear()

def _operation_name(self, db_name, statement):
Expand Down Expand Up @@ -300,3 +298,22 @@ def _get_attributes_from_cursor(vendor, cursor, attrs):
if info.port:
attrs[SpanAttributes.NET_PEER_PORT] = int(info.port)
return attrs


def _get_connection_string(engine):
drivername = engine.url.drivername or ""
host = engine.url.host or ""
port = engine.url.port or ""
database = engine.url.database or ""
return f"{drivername}://{host}:{port}/{database}"


def _get_attributes_from_engine(engine):
"""Set metadata attributes of the database engine"""
attrs = {}

attrs["pool.name"] = getattr(
getattr(engine, "pool", None), "logging_name", None
) or _get_connection_string(engine)

return attrs
Original file line number Diff line number Diff line change
Expand Up @@ -307,3 +307,26 @@ def test_no_op_tracer_provider(self):
cnx.execute("SELECT 1 + 1;").fetchall()
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)

def test_no_memory_leakage_if_engine_diposed(self):
SQLAlchemyInstrumentor().instrument()
import gc
import weakref

from sqlalchemy import create_engine

callback = mock.Mock()

def make_shortlived_engine():
engine = create_engine("sqlite:///:memory:")
# Callback will be called if engine is deallocated during garbage
# collection
weakref.finalize(engine, callback)
with engine.connect() as conn:
conn.execute("SELECT 1 + 1;").fetchall()

for _ in range(0, 5):
make_shortlived_engine()

gc.collect()
assert callback.call_count == 5

0 comments on commit 2e49ba1

Please sign in to comment.