From a9dd3ed0d8f174f5113d7bab4b35b17ec734c3e7 Mon Sep 17 00:00:00 2001 From: "Gabriele N. Tornetta" Date: Tue, 28 Mar 2023 11:37:54 +0100 Subject: [PATCH] fix(futures): unload futures submodule during module cleanup (#5402) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This change ensures that the concurrent.futures submodule is unloaded when doing module unloading (e.g. when gevent is detected). This ensures that the module is reloaded when needed and uses the potential new state of the cloned threading module. ## Issue insight The captured stacks for all the processes during the thread pool deadlock look as follows ~~~ ๐Ÿ’ค๐Ÿง’ Process 82315 ๐Ÿงต Thread 4312286592 Thread._bootstrap (/Users/gabriele.tornetta/.pyenv/versions/3.11.2/lib/python3.11/threading.py:995) Thread._bootstrap_inner (/Users/gabriele.tornetta/.pyenv/versions/3.11.2/lib/python3.11/threading.py:1038) Thread.run (/Users/gabriele.tornetta/.pyenv/versions/3.11.2/lib/python3.11/threading.py:975) _worker (/Users/gabriele.tornetta/.pyenv/versions/3.11.2/lib/python3.11/concurrent/futures/thread.py:83) _WorkItem.run (/Users/gabriele.tornetta/.pyenv/versions/3.11.2/lib/python3.11/concurrent/futures/thread.py:58) _wrap_execution (/Users/gabriele.tornetta/p403n1x87/dd-trace-py/ddtrace/contrib/futures/threading.py:43) Task.__call__ (/Users/gabriele.tornetta/p403n1x87/ddtrace-gevent-bug/.venv/lib/python3.11/site-packages/s3transfer/tasks.py:139) Task._execute_main (/Users/gabriele.tornetta/p403n1x87/ddtrace-gevent-bug/.venv/lib/python3.11/site-packages/s3transfer/tasks.py:162) SubmissionTask._main (/Users/gabriele.tornetta/p403n1x87/ddtrace-gevent-bug/.venv/lib/python3.11/site-packages/s3transfer/tasks.py:269) UploadSubmissionTask._submit (/Users/gabriele.tornetta/p403n1x87/ddtrace-gevent-bug/.venv/lib/python3.11/site-packages/s3transfer/upload.py:591) UploadSubmissionTask._submit_upload_request (/Users/gabriele.tornetta/p403n1x87/ddtrace-gevent-bug/.venv/lib/python3.11/site-packages/s3transfer/upload.py:626) TransferCoordinator.submit (/Users/gabriele.tornetta/p403n1x87/ddtrace-gevent-bug/.venv/lib/python3.11/site-packages/s3transfer/futures.py:323) BoundedExecutor.submit (/Users/gabriele.tornetta/p403n1x87/ddtrace-gevent-bug/.venv/lib/python3.11/site-packages/s3transfer/futures.py:474) _wrap_submit (/Users/gabriele.tornetta/p403n1x87/dd-trace-py/ddtrace/contrib/futures/threading.py:30) ThreadPoolExecutor.submit (/Users/gabriele.tornetta/.pyenv/versions/3.11.2/lib/python3.11/concurrent/futures/thread.py:162) ๐Ÿ’ค Process 81968 ๐Ÿงต Thread 4312286592 (/Users/gabriele.tornetta/p403n1x87/ddtrace-gevent-bug/.venv/bin/gunicorn:8) run (/Users/gabriele.tornetta/p403n1x87/ddtrace-gevent-bug/.venv/lib/python3.11/site-packages/gunicorn/app/wsgiapp.py:67) Application.run (/Users/gabriele.tornetta/p403n1x87/ddtrace-gevent-bug/.venv/lib/python3.11/site-packages/gunicorn/app/base.py:231) BaseApplication.run (/Users/gabriele.tornetta/p403n1x87/ddtrace-gevent-bug/.venv/lib/python3.11/site-packages/gunicorn/app/base.py:72) Arbiter.run (/Users/gabriele.tornetta/p403n1x87/ddtrace-gevent-bug/.venv/lib/python3.11/site-packages/gunicorn/arbiter.py:209) Arbiter.sleep (/Users/gabriele.tornetta/p403n1x87/ddtrace-gevent-bug/.venv/lib/python3.11/site-packages/gunicorn/arbiter.py:357) ~~~ ## Implementation details The patching of `ThreadPoolExecutor.submit` has been refactored to be performed with the internal wrapping utilities instead of `wrapt`. One issue caused by the previous implementation was in the way `wrapt` used module paths to locate and patch the designated callable. Since the `concurrent` module is not being unloaded, any attempts to look up `futures` from `concurrent` would lead to the original reference being returned, even in cases where the reference to the cloned `concurrent.futures` is desired. _En passant_, the refactor has also brought to light a tolerance to passing a mandatory positional argument to `ThreadPoolExecutor.submit` via keyword arguments. ## Testing strategy The change includes a simple reproducing case that works with the fix in place. ## Risks The `concurrent` module has been added to the exclude list in the initial implementation of #4863 because of issues observed during test runs (in particular with framework tests). It is unclear whether the same issues would actually arise outside the CI context and in actual applications. However, because the `concurrent.futures` modules creates lock objects on initialisation, and since it is imported indirectly by `ddtrace` via its dependencies, the issue caused by the `futures` integration requires the module to be reloaded after the gevent monkey-patching. The compromise of this PR is to unload only the `concurrent.futures` sub-module (and its sub-modules), which seems enough to solve the original issue. Fixes #5398 --- ddtrace/_monkey.py | 2 +- ddtrace/bootstrap/sitecustomize.py | 29 ++++++++++---- ddtrace/contrib/futures/patch.py | 39 ++++++++++++++----- ddtrace/contrib/futures/threading.py | 9 +++-- ...utures-module-unload-e84900affff06152.yaml | 6 +++ riotfile.py | 1 + tests/conftest.py | 4 +- .../futures/test_futures_patch_generated.py | 2 +- tests/contrib/futures/test_propagation.py | 27 ++++++++++++- tests/utils.py | 8 +++- 10 files changed, 102 insertions(+), 25 deletions(-) create mode 100644 releasenotes/notes/fix-futures-module-unload-e84900affff06152.yaml diff --git a/ddtrace/_monkey.py b/ddtrace/_monkey.py index c93a7e20996..4a85cffa64f 100644 --- a/ddtrace/_monkey.py +++ b/ddtrace/_monkey.py @@ -115,7 +115,7 @@ "cassandra": ("cassandra.cluster",), "dogpile_cache": ("dogpile.cache",), "mysqldb": ("MySQLdb",), - "futures": ("concurrent.futures",), + "futures": ("concurrent.futures.thread",), "vertica": ("vertica_python",), "aws_lambda": ("datadog_lambda",), "httplib": ("httplib" if PY2 else "http.client",), diff --git a/ddtrace/bootstrap/sitecustomize.py b/ddtrace/bootstrap/sitecustomize.py index 8d09b69bd6d..d7f25d739c5 100644 --- a/ddtrace/bootstrap/sitecustomize.py +++ b/ddtrace/bootstrap/sitecustomize.py @@ -100,6 +100,14 @@ def is_module_installed(module_name): def cleanup_loaded_modules(): + def drop(module_name): + # type: (str) -> None + if PY2: + # Store a reference to deleted modules to avoid them being garbage + # collected + _unloaded_modules.append(sys.modules[module_name]) + del sys.modules[module_name] + MODULES_REQUIRING_CLEANUP = ("gevent",) do_cleanup = os.getenv("DD_UNLOAD_MODULES_FROM_SITECUSTOMIZE", default="auto").lower() if do_cleanup == "auto": @@ -114,27 +122,34 @@ def cleanup_loaded_modules(): # gets when it does `import threading`. The same applies to every module # not in `KEEP_MODULES`. KEEP_MODULES = frozenset(["atexit", "ddtrace", "asyncio", "concurrent", "typing", "logging", "attr"]) + if PY2: + KEEP_MODULES_PY2 = frozenset(["encodings", "codecs"]) for m in list(_ for _ in sys.modules if _ not in LOADED_MODULES): if any(m == _ or m.startswith(_ + ".") for _ in KEEP_MODULES): continue if PY2: - KEEP_MODULES_PY2 = frozenset(["encodings", "codecs"]) if any(m == _ or m.startswith(_ + ".") for _ in KEEP_MODULES_PY2): continue - # Store a reference to deleted modules to avoid them being garbage - # collected - _unloaded_modules.append(sys.modules[m]) - del sys.modules[m] + drop(m) # TODO: The better strategy is to identify the core modues in LOADED_MODULES # that should not be unloaded, and then unload as much as possible. - UNLOAD_MODULES = frozenset(["time"]) + UNLOAD_MODULES = frozenset( + [ + # imported in Python >= 3.10 and patched by gevent + "time", + # we cannot unload the whole concurrent hierarchy, but this + # submodule makes use of threading so it is critical to unload when + # gevent is used. + "concurrent.futures", + ] + ) for u in UNLOAD_MODULES: for m in list(sys.modules): if m == u or m.startswith(u + "."): - del sys.modules[m] + drop(m) try: diff --git a/ddtrace/contrib/futures/patch.py b/ddtrace/contrib/futures/patch.py index bdb04454ec1..b0efeea8147 100644 --- a/ddtrace/contrib/futures/patch.py +++ b/ddtrace/contrib/futures/patch.py @@ -1,24 +1,45 @@ -from concurrent import futures +import sys -from ddtrace.vendor.wrapt import wrap_function_wrapper as _w +from ddtrace.internal.compat import PY2 +from ddtrace.internal.wrapping import unwrap as _u +from ddtrace.internal.wrapping import wrap as _w -from ..trace_utils import unwrap as _u from .threading import _wrap_submit def patch(): """Enables Context Propagation between threads""" - if getattr(futures, "__datadog_patch", False): + try: + # Ensure that we get hold of the reloaded module if module cleanup was + # performed. + thread = sys.modules["concurrent.futures.thread"] + except KeyError: + import concurrent.futures.thread as thread + + if getattr(thread, "__datadog_patch", False): return - setattr(futures, "__datadog_patch", True) + setattr(thread, "__datadog_patch", True) - _w("concurrent.futures", "ThreadPoolExecutor.submit", _wrap_submit) + if PY2: + _w(thread.ThreadPoolExecutor.submit.__func__, _wrap_submit) + else: + _w(thread.ThreadPoolExecutor.submit, _wrap_submit) def unpatch(): """Disables Context Propagation between threads""" - if not getattr(futures, "__datadog_patch", False): + try: + # Ensure that we get hold of the reloaded module if module cleanup was + # performed. + thread = sys.modules["concurrent.futures.thread"] + except KeyError: return - setattr(futures, "__datadog_patch", False) - _u(futures.ThreadPoolExecutor, "submit") + if not getattr(thread, "__datadog_patch", False): + return + setattr(thread, "__datadog_patch", False) + + if PY2: + _u(thread.ThreadPoolExecutor.submit.__func__, _wrap_submit) + else: + _u(thread.ThreadPoolExecutor.submit, _wrap_submit) diff --git a/ddtrace/contrib/futures/threading.py b/ddtrace/contrib/futures/threading.py index 9800978045f..ab67e215555 100644 --- a/ddtrace/contrib/futures/threading.py +++ b/ddtrace/contrib/futures/threading.py @@ -1,7 +1,7 @@ import ddtrace -def _wrap_submit(func, instance, args, kwargs): +def _wrap_submit(func, args, kwargs): """ Wrap `Executor` method used to submit a work executed in another thread. This wrapper ensures that a new `Context` is created and @@ -22,12 +22,13 @@ def _wrap_submit(func, instance, args, kwargs): current_ctx = ddtrace.tracer.context_provider.active() # The target function can be provided as a kwarg argument "fn" or the first positional argument + self = args[0] if "fn" in kwargs: fn = kwargs.pop("fn") - fn_args = args + fn_args = args[1:] else: - fn, fn_args = args[0], args[1:] - return func(_wrap_execution, current_ctx, fn, fn_args, kwargs) + fn, fn_args = args[1], args[2:] + return func(self, _wrap_execution, current_ctx, fn, fn_args, kwargs) def _wrap_execution(ctx, fn, args, kwargs): diff --git a/releasenotes/notes/fix-futures-module-unload-e84900affff06152.yaml b/releasenotes/notes/fix-futures-module-unload-e84900affff06152.yaml new file mode 100644 index 00000000000..eb0ca70c603 --- /dev/null +++ b/releasenotes/notes/fix-futures-module-unload-e84900affff06152.yaml @@ -0,0 +1,6 @@ +--- +fixes: + - | + futures: Resolves an issue that prevents tasks from being submitted to a + thread pool executor when gevent is used (e.g. as a worker class for + gunicorn or celery). diff --git a/riotfile.py b/riotfile.py index 2ca8fe6e5da..0e23b66b407 100644 --- a/riotfile.py +++ b/riotfile.py @@ -2270,6 +2270,7 @@ def select_pys(min_version=MIN_PYTHON_VERSION, max_version=MAX_PYTHON_VERSION): Venv( name="futures", command="pytest {cmdargs} tests/contrib/futures", + pkgs={"gevent": latest}, venvs=[ # futures is backported for 2.7 Venv(pys=["2.7"], pkgs={"futures": ["~=3.0", "~=3.1", "~=3.2", "~=3.4"]}), diff --git a/tests/conftest.py b/tests/conftest.py index 2212e081236..8a7446d56ec 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -188,6 +188,8 @@ def run_function_from_file(item, params=None): args = [sys.executable] + timeout = marker.kwargs.get("timeout", None) + # Add ddtrace-run prefix in ddtrace-run mode if marker.kwargs.get("ddtrace_run", False): args.insert(0, "ddtrace-run") @@ -222,7 +224,7 @@ def run_function_from_file(item, params=None): args.extend(marker.kwargs.get("args", [])) def _subprocess_wrapper(): - out, err, status, _ = call_program(*args, env=env, cwd=cwd) + out, err, status, _ = call_program(*args, env=env, cwd=cwd, timeout=timeout) if status != expected_status: raise AssertionError( diff --git a/tests/contrib/futures/test_futures_patch_generated.py b/tests/contrib/futures/test_futures_patch_generated.py index f698aa8ec16..384e5cc10e5 100644 --- a/tests/contrib/futures/test_futures_patch_generated.py +++ b/tests/contrib/futures/test_futures_patch_generated.py @@ -15,7 +15,7 @@ class TestFuturesPatch(PatchTestCase.Base): __integration_name__ = "futures" - __module_name__ = "concurrent.futures" + __module_name__ = "concurrent.futures.thread" __patch_func__ = patch __unpatch_func__ = unpatch diff --git a/tests/contrib/futures/test_propagation.py b/tests/contrib/futures/test_propagation.py index 35ac667ca40..8a2afe29227 100644 --- a/tests/contrib/futures/test_propagation.py +++ b/tests/contrib/futures/test_propagation.py @@ -1,6 +1,8 @@ import concurrent import time +import pytest + from ddtrace.contrib.futures import patch from ddtrace.contrib.futures import unpatch from tests.opentracer.utils import init_tracer @@ -78,7 +80,7 @@ def fn(value, key=None): with self.override_global_tracer(): with self.tracer.trace("main.thread"): with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: - future = executor.submit(fn=fn, value=42, key="CheeseShop") + future = executor.submit(fn, value=42, key="CheeseShop") value, key = future.result() # assert the right result self.assertEqual(value, 42) @@ -349,3 +351,26 @@ def fn(): dict(name="main.thread"), (dict(name="executor.thread"),), ) + + +@pytest.mark.subprocess(ddtrace_run=True, timeout=5) +def test_concurrent_futures_with_gevent(): + """Check compatibility between the integration and gevent""" + import os + import sys + + pid = os.fork() + if pid == 0: + from gevent import monkey + + monkey.patch_all() + import concurrent.futures.thread + from time import sleep + + assert concurrent.futures.thread.__datadog_patch + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + future = executor.submit(lambda: sleep(0.1) or 42) + result = future.result() + assert result == 42 + sys.exit(0) + os.waitpid(pid, 0) diff --git a/tests/utils.py b/tests/utils.py index ef93e8e5092..56da20f07d9 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -16,6 +16,7 @@ from ddtrace import Tracer from ddtrace.constants import SPAN_MEASURED_KEY from ddtrace.ext import http +from ddtrace.internal.compat import PY2 from ddtrace.internal.compat import httplib from ddtrace.internal.compat import parse from ddtrace.internal.compat import to_unicode @@ -1030,9 +1031,14 @@ def __eq__(self, other): def call_program(*args, **kwargs): + timeout = kwargs.pop("timeout", None) close_fds = sys.platform != "win32" subp = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=close_fds, **kwargs) - stdout, stderr = subp.communicate() + if PY2: + # Python 2 doesn't support timeout + stdout, stderr = subp.communicate() + else: + stdout, stderr = subp.communicate(timeout=timeout) return stdout, stderr, subp.wait(), subp.pid