Skip to content

Commit

Permalink
fix(futures): unload futures submodule during module cleanup (DataDog…
Browse files Browse the repository at this point in the history
…#5402)

This change ensures that the concurrent.futures submodule is unloaded
when doing module unloading (e.g. when gevent is detected). This ensures
that the module is reloaded when needed and uses the potential new state
of the cloned threading module.

## Issue insight

The captured stacks for all the processes during the thread pool
deadlock look as follows

~~~
💤🧒 Process 82315 🧵 Thread 4312286592

Thread._bootstrap
(/Users/gabriele.tornetta/.pyenv/versions/3.11.2/lib/python3.11/threading.py:995)
Thread._bootstrap_inner
(/Users/gabriele.tornetta/.pyenv/versions/3.11.2/lib/python3.11/threading.py:1038)
Thread.run
(/Users/gabriele.tornetta/.pyenv/versions/3.11.2/lib/python3.11/threading.py:975)
_worker
(/Users/gabriele.tornetta/.pyenv/versions/3.11.2/lib/python3.11/concurrent/futures/thread.py:83)
_WorkItem.run
(/Users/gabriele.tornetta/.pyenv/versions/3.11.2/lib/python3.11/concurrent/futures/thread.py:58)
_wrap_execution
(/Users/gabriele.tornetta/p403n1x87/dd-trace-py/ddtrace/contrib/futures/threading.py:43)
Task.__call__
(/Users/gabriele.tornetta/p403n1x87/ddtrace-gevent-bug/.venv/lib/python3.11/site-packages/s3transfer/tasks.py:139)
Task._execute_main
(/Users/gabriele.tornetta/p403n1x87/ddtrace-gevent-bug/.venv/lib/python3.11/site-packages/s3transfer/tasks.py:162)
SubmissionTask._main
(/Users/gabriele.tornetta/p403n1x87/ddtrace-gevent-bug/.venv/lib/python3.11/site-packages/s3transfer/tasks.py:269)
UploadSubmissionTask._submit
(/Users/gabriele.tornetta/p403n1x87/ddtrace-gevent-bug/.venv/lib/python3.11/site-packages/s3transfer/upload.py:591)
UploadSubmissionTask._submit_upload_request
(/Users/gabriele.tornetta/p403n1x87/ddtrace-gevent-bug/.venv/lib/python3.11/site-packages/s3transfer/upload.py:626)
TransferCoordinator.submit
(/Users/gabriele.tornetta/p403n1x87/ddtrace-gevent-bug/.venv/lib/python3.11/site-packages/s3transfer/futures.py:323)
BoundedExecutor.submit
(/Users/gabriele.tornetta/p403n1x87/ddtrace-gevent-bug/.venv/lib/python3.11/site-packages/s3transfer/futures.py:474)
_wrap_submit
(/Users/gabriele.tornetta/p403n1x87/dd-trace-py/ddtrace/contrib/futures/threading.py:30)
ThreadPoolExecutor.submit
(/Users/gabriele.tornetta/.pyenv/versions/3.11.2/lib/python3.11/concurrent/futures/thread.py:162)


💤 Process 81968 🧵 Thread 4312286592

<module>
(/Users/gabriele.tornetta/p403n1x87/ddtrace-gevent-bug/.venv/bin/gunicorn:8)
run
(/Users/gabriele.tornetta/p403n1x87/ddtrace-gevent-bug/.venv/lib/python3.11/site-packages/gunicorn/app/wsgiapp.py:67)
Application.run
(/Users/gabriele.tornetta/p403n1x87/ddtrace-gevent-bug/.venv/lib/python3.11/site-packages/gunicorn/app/base.py:231)
BaseApplication.run
(/Users/gabriele.tornetta/p403n1x87/ddtrace-gevent-bug/.venv/lib/python3.11/site-packages/gunicorn/app/base.py:72)
Arbiter.run
(/Users/gabriele.tornetta/p403n1x87/ddtrace-gevent-bug/.venv/lib/python3.11/site-packages/gunicorn/arbiter.py:209)
Arbiter.sleep
(/Users/gabriele.tornetta/p403n1x87/ddtrace-gevent-bug/.venv/lib/python3.11/site-packages/gunicorn/arbiter.py:357)
~~~

## Implementation details

The patching of `ThreadPoolExecutor.submit` has been refactored to be
performed with the internal wrapping utilities instead of `wrapt`. One
issue caused by the previous implementation was in the way `wrapt` used
module paths to locate and patch the designated callable. Since the
`concurrent` module is not being unloaded, any attempts to look up
`futures` from `concurrent` would lead to the original reference being
returned, even in cases where the reference to the cloned
`concurrent.futures` is desired. _En passant_, the refactor has also
brought to light a tolerance to passing a mandatory positional argument
to `ThreadPoolExecutor.submit` via keyword arguments.

## Testing strategy

The change includes a simple reproducing case that works with the fix in
place.

## Risks

The `concurrent` module has been added to the exclude list in the
initial implementation of DataDog#4863 because of issues observed during test
runs (in particular with framework tests). It is unclear whether the
same issues would actually arise outside the CI context and in actual
applications. However, because the `concurrent.futures` modules creates
lock objects on initialisation, and since it is imported indirectly by
`ddtrace` via its dependencies, the issue caused by the `futures`
integration requires the module to be reloaded after the gevent
monkey-patching. The compromise of this PR is to unload only the
`concurrent.futures` sub-module (and its sub-modules), which seems
enough to solve the original issue.

Fixes DataDog#5398
  • Loading branch information
P403n1x87 authored and IlyaMichlin committed Mar 30, 2023
1 parent 551a7dd commit a9dd3ed
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 25 deletions.
2 changes: 1 addition & 1 deletion ddtrace/_monkey.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@
"cassandra": ("cassandra.cluster",),
"dogpile_cache": ("dogpile.cache",),
"mysqldb": ("MySQLdb",),
"futures": ("concurrent.futures",),
"futures": ("concurrent.futures.thread",),
"vertica": ("vertica_python",),
"aws_lambda": ("datadog_lambda",),
"httplib": ("httplib" if PY2 else "http.client",),
Expand Down
29 changes: 22 additions & 7 deletions ddtrace/bootstrap/sitecustomize.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ def is_module_installed(module_name):


def cleanup_loaded_modules():
def drop(module_name):
# type: (str) -> None
if PY2:
# Store a reference to deleted modules to avoid them being garbage
# collected
_unloaded_modules.append(sys.modules[module_name])
del sys.modules[module_name]

MODULES_REQUIRING_CLEANUP = ("gevent",)
do_cleanup = os.getenv("DD_UNLOAD_MODULES_FROM_SITECUSTOMIZE", default="auto").lower()
if do_cleanup == "auto":
Expand All @@ -114,27 +122,34 @@ def cleanup_loaded_modules():
# gets when it does `import threading`. The same applies to every module
# not in `KEEP_MODULES`.
KEEP_MODULES = frozenset(["atexit", "ddtrace", "asyncio", "concurrent", "typing", "logging", "attr"])
if PY2:
KEEP_MODULES_PY2 = frozenset(["encodings", "codecs"])
for m in list(_ for _ in sys.modules if _ not in LOADED_MODULES):
if any(m == _ or m.startswith(_ + ".") for _ in KEEP_MODULES):
continue

if PY2:
KEEP_MODULES_PY2 = frozenset(["encodings", "codecs"])
if any(m == _ or m.startswith(_ + ".") for _ in KEEP_MODULES_PY2):
continue
# Store a reference to deleted modules to avoid them being garbage
# collected
_unloaded_modules.append(sys.modules[m])

del sys.modules[m]
drop(m)

# TODO: The better strategy is to identify the core modues in LOADED_MODULES
# that should not be unloaded, and then unload as much as possible.
UNLOAD_MODULES = frozenset(["time"])
UNLOAD_MODULES = frozenset(
[
# imported in Python >= 3.10 and patched by gevent
"time",
# we cannot unload the whole concurrent hierarchy, but this
# submodule makes use of threading so it is critical to unload when
# gevent is used.
"concurrent.futures",
]
)
for u in UNLOAD_MODULES:
for m in list(sys.modules):
if m == u or m.startswith(u + "."):
del sys.modules[m]
drop(m)


try:
Expand Down
39 changes: 30 additions & 9 deletions ddtrace/contrib/futures/patch.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,45 @@
from concurrent import futures
import sys

from ddtrace.vendor.wrapt import wrap_function_wrapper as _w
from ddtrace.internal.compat import PY2
from ddtrace.internal.wrapping import unwrap as _u
from ddtrace.internal.wrapping import wrap as _w

from ..trace_utils import unwrap as _u
from .threading import _wrap_submit


def patch():
"""Enables Context Propagation between threads"""
if getattr(futures, "__datadog_patch", False):
try:
# Ensure that we get hold of the reloaded module if module cleanup was
# performed.
thread = sys.modules["concurrent.futures.thread"]
except KeyError:
import concurrent.futures.thread as thread

if getattr(thread, "__datadog_patch", False):
return
setattr(futures, "__datadog_patch", True)
setattr(thread, "__datadog_patch", True)

_w("concurrent.futures", "ThreadPoolExecutor.submit", _wrap_submit)
if PY2:
_w(thread.ThreadPoolExecutor.submit.__func__, _wrap_submit)
else:
_w(thread.ThreadPoolExecutor.submit, _wrap_submit)


def unpatch():
"""Disables Context Propagation between threads"""
if not getattr(futures, "__datadog_patch", False):
try:
# Ensure that we get hold of the reloaded module if module cleanup was
# performed.
thread = sys.modules["concurrent.futures.thread"]
except KeyError:
return
setattr(futures, "__datadog_patch", False)

_u(futures.ThreadPoolExecutor, "submit")
if not getattr(thread, "__datadog_patch", False):
return
setattr(thread, "__datadog_patch", False)

if PY2:
_u(thread.ThreadPoolExecutor.submit.__func__, _wrap_submit)
else:
_u(thread.ThreadPoolExecutor.submit, _wrap_submit)
9 changes: 5 additions & 4 deletions ddtrace/contrib/futures/threading.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import ddtrace


def _wrap_submit(func, instance, args, kwargs):
def _wrap_submit(func, args, kwargs):
"""
Wrap `Executor` method used to submit a work executed in another
thread. This wrapper ensures that a new `Context` is created and
Expand All @@ -22,12 +22,13 @@ def _wrap_submit(func, instance, args, kwargs):
current_ctx = ddtrace.tracer.context_provider.active()

# The target function can be provided as a kwarg argument "fn" or the first positional argument
self = args[0]
if "fn" in kwargs:
fn = kwargs.pop("fn")
fn_args = args
fn_args = args[1:]
else:
fn, fn_args = args[0], args[1:]
return func(_wrap_execution, current_ctx, fn, fn_args, kwargs)
fn, fn_args = args[1], args[2:]
return func(self, _wrap_execution, current_ctx, fn, fn_args, kwargs)


def _wrap_execution(ctx, fn, args, kwargs):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
fixes:
- |
futures: Resolves an issue that prevents tasks from being submitted to a
thread pool executor when gevent is used (e.g. as a worker class for
gunicorn or celery).
1 change: 1 addition & 0 deletions riotfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -2270,6 +2270,7 @@ def select_pys(min_version=MIN_PYTHON_VERSION, max_version=MAX_PYTHON_VERSION):
Venv(
name="futures",
command="pytest {cmdargs} tests/contrib/futures",
pkgs={"gevent": latest},
venvs=[
# futures is backported for 2.7
Venv(pys=["2.7"], pkgs={"futures": ["~=3.0", "~=3.1", "~=3.2", "~=3.4"]}),
Expand Down
4 changes: 3 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ def run_function_from_file(item, params=None):

args = [sys.executable]

timeout = marker.kwargs.get("timeout", None)

# Add ddtrace-run prefix in ddtrace-run mode
if marker.kwargs.get("ddtrace_run", False):
args.insert(0, "ddtrace-run")
Expand Down Expand Up @@ -222,7 +224,7 @@ def run_function_from_file(item, params=None):
args.extend(marker.kwargs.get("args", []))

def _subprocess_wrapper():
out, err, status, _ = call_program(*args, env=env, cwd=cwd)
out, err, status, _ = call_program(*args, env=env, cwd=cwd, timeout=timeout)

if status != expected_status:
raise AssertionError(
Expand Down
2 changes: 1 addition & 1 deletion tests/contrib/futures/test_futures_patch_generated.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

class TestFuturesPatch(PatchTestCase.Base):
__integration_name__ = "futures"
__module_name__ = "concurrent.futures"
__module_name__ = "concurrent.futures.thread"
__patch_func__ = patch
__unpatch_func__ = unpatch

Expand Down
27 changes: 26 additions & 1 deletion tests/contrib/futures/test_propagation.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import concurrent
import time

import pytest

from ddtrace.contrib.futures import patch
from ddtrace.contrib.futures import unpatch
from tests.opentracer.utils import init_tracer
Expand Down Expand Up @@ -78,7 +80,7 @@ def fn(value, key=None):
with self.override_global_tracer():
with self.tracer.trace("main.thread"):
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
future = executor.submit(fn=fn, value=42, key="CheeseShop")
future = executor.submit(fn, value=42, key="CheeseShop")
value, key = future.result()
# assert the right result
self.assertEqual(value, 42)
Expand Down Expand Up @@ -349,3 +351,26 @@ def fn():
dict(name="main.thread"),
(dict(name="executor.thread"),),
)


@pytest.mark.subprocess(ddtrace_run=True, timeout=5)
def test_concurrent_futures_with_gevent():
"""Check compatibility between the integration and gevent"""
import os
import sys

pid = os.fork()
if pid == 0:
from gevent import monkey

monkey.patch_all()
import concurrent.futures.thread
from time import sleep

assert concurrent.futures.thread.__datadog_patch
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
future = executor.submit(lambda: sleep(0.1) or 42)
result = future.result()
assert result == 42
sys.exit(0)
os.waitpid(pid, 0)
8 changes: 7 additions & 1 deletion tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ddtrace import Tracer
from ddtrace.constants import SPAN_MEASURED_KEY
from ddtrace.ext import http
from ddtrace.internal.compat import PY2
from ddtrace.internal.compat import httplib
from ddtrace.internal.compat import parse
from ddtrace.internal.compat import to_unicode
Expand Down Expand Up @@ -1030,9 +1031,14 @@ def __eq__(self, other):


def call_program(*args, **kwargs):
timeout = kwargs.pop("timeout", None)
close_fds = sys.platform != "win32"
subp = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=close_fds, **kwargs)
stdout, stderr = subp.communicate()
if PY2:
# Python 2 doesn't support timeout
stdout, stderr = subp.communicate()
else:
stdout, stderr = subp.communicate(timeout=timeout)
return stdout, stderr, subp.wait(), subp.pid


Expand Down

0 comments on commit a9dd3ed

Please sign in to comment.