diff --git a/ddtrace/auto.py b/ddtrace/auto.py new file mode 100644 index 00000000000..ebc0fe379c3 --- /dev/null +++ b/ddtrace/auto.py @@ -0,0 +1 @@ +import ddtrace.bootstrap.sitecustomize # noqa diff --git a/ddtrace/bootstrap/sitecustomize.py b/ddtrace/bootstrap/sitecustomize.py index 220deb9f474..677174e7c35 100644 --- a/ddtrace/bootstrap/sitecustomize.py +++ b/ddtrace/bootstrap/sitecustomize.py @@ -5,109 +5,19 @@ import sys -MODULES_LOADED_AT_STARTUP = frozenset(sys.modules.keys()) -MODULES_THAT_TRIGGER_CLEANUP_WHEN_INSTALLED = ("gevent",) - - -import os # noqa - - -""" -The following modules cause problems when being unloaded/reloaded in module cloning. -Notably, unloading the atexit module will remove all registered hooks which we use for cleaning up on tracer shutdown. -The other listed modules internally maintain some state that does not coexist well if reloaded. -""" -MODULES_TO_NOT_CLEANUP = {"atexit", "asyncio", "attr", "concurrent", "ddtrace", "logging"} -if sys.version_info < (3, 7): - MODULES_TO_NOT_CLEANUP |= {"typing"} # required by older versions of Python -if sys.version_info <= (2, 7): - MODULES_TO_NOT_CLEANUP |= {"encodings", "codecs"} - import imp - - _unloaded_modules = [] - - def is_installed(module_name): - try: - imp.find_module(module_name) - except ImportError: - return False - return True - - -else: - import importlib - - def is_installed(module_name): - return importlib.util.find_spec(module_name) - - -def should_cleanup_loaded_modules(): - dd_unload_sitecustomize_modules = os.getenv("DD_UNLOAD_MODULES_FROM_SITECUSTOMIZE", default="0").lower() - if dd_unload_sitecustomize_modules not in ("1", "auto"): - return False - elif dd_unload_sitecustomize_modules == "auto" and not any( - is_installed(module_name) for module_name in MODULES_THAT_TRIGGER_CLEANUP_WHEN_INSTALLED - ): - return False - return True - - -def cleanup_loaded_modules(aggressive=False): - """ - "Aggressive" here means "cleanup absolutely every module that has been loaded since startup". - Non-aggressive cleanup entails leaving untouched certain modules - This distinction is necessary because this function is used both to prepare for gevent monkeypatching - (requiring aggressive cleanup) and to implement "module cloning" (requiring non-aggressive cleanup) - """ - # Figuring out modules_loaded_since_startup is necessary because sys.modules has more in it than just what's in - # import statements in this file, and unloading some of them can break the interpreter. - modules_loaded_since_startup = set(_ for _ in sys.modules if _ not in MODULES_LOADED_AT_STARTUP) - # Unload all the modules that we have imported, except for ddtrace and a few - # others that don't like being cloned. - # Doing so will allow ddtrace to continue using its local references to modules unpatched by - # gevent, while avoiding conflicts with user-application code potentially running - # `gevent.monkey.patch_all()` and thus gevent-patched versions of the same modules. - for module_name in modules_loaded_since_startup: - if aggressive: - del sys.modules[module_name] - continue - - for module_to_not_cleanup in MODULES_TO_NOT_CLEANUP: - if module_name == module_to_not_cleanup: - break - elif module_name.startswith("%s." % module_to_not_cleanup): - break - else: - del sys.modules[module_name] - # Some versions of CPython import the time module during interpreter startup, which needs to be unloaded. - if "time" in sys.modules: - del sys.modules["time"] - - -will_run_module_cloning = should_cleanup_loaded_modules() -if not will_run_module_cloning: - # Perform gevent patching as early as possible in the application before - # importing more of the library internals. - if os.environ.get("DD_GEVENT_PATCH_ALL", "false").lower() in ("true", "1"): - # successfully running `gevent.monkey.patch_all()` this late into - # sitecustomize requires aggressive module unloading beforehand. - # gevent's documentation strongly warns against calling monkey.patch_all() anywhere other - # than the first line of the program. since that's what we're doing here, - # we cleanup aggressively beforehand to replicate the conditions at program start - # as closely as possible. - cleanup_loaded_modules(aggressive=True) - import gevent.monkey - - gevent.monkey.patch_all() +LOADED_MODULES = frozenset(sys.modules.keys()) import logging # noqa import os # noqa from typing import Any # noqa from typing import Dict # noqa +import warnings # noqa from ddtrace import config # noqa from ddtrace.debugging._config import config as debugger_config # noqa +from ddtrace.internal.compat import PY2 # noqa from ddtrace.internal.logger import get_logger # noqa +from ddtrace.internal.module import find_loader # noqa from ddtrace.internal.runtime.runtime_metrics import RuntimeWorker # noqa from ddtrace.internal.utils.formats import asbool # noqa from ddtrace.internal.utils.formats import parse_tags_str # noqa @@ -142,6 +52,25 @@ def cleanup_loaded_modules(aggressive=False): log = get_logger(__name__) +if os.environ.get("DD_GEVENT_PATCH_ALL") is not None: + deprecate( + "The environment variable DD_GEVENT_PATCH_ALL is deprecated and will be removed in a future version. ", + postfix="There is no special configuration necessary to make ddtrace work with gevent if using ddtrace-run. " + "If not using ddtrace-run, import ddtrace.auto before calling gevent.monkey.patch_all().", + removal_version="2.0.0", + ) +if "gevent" in sys.modules or "gevent.monkey" in sys.modules: + import gevent.monkey # noqa + + if gevent.monkey.is_module_patched("threading"): + warnings.warn( + "Loading ddtrace after gevent.monkey.patch_all() is not supported and is " + "likely to break the application. Use ddtrace-run to fix this, or " + "import ddtrace.auto before calling gevent.monkey.patch_all().", + RuntimeWarning, + ) + + EXTRA_PATCHED_MODULES = { "bottle": True, "django": True, @@ -162,6 +91,52 @@ def update_patched_modules(): EXTRA_PATCHED_MODULES[module] = asbool(should_patch) +if PY2: + _unloaded_modules = [] + + +def is_module_installed(module_name): + return find_loader(module_name) is not None + + +def cleanup_loaded_modules(): + MODULES_REQUIRING_CLEANUP = ("gevent",) + do_cleanup = os.getenv("DD_UNLOAD_MODULES_FROM_SITECUSTOMIZE", default="auto").lower() + if do_cleanup == "auto": + do_cleanup = any(is_module_installed(m) for m in MODULES_REQUIRING_CLEANUP) + + if not asbool(do_cleanup): + return + + # Unload all the modules that we have imported, except for the ddtrace one. + # NB: this means that every `import threading` anywhere in `ddtrace/` code + # uses a copy of that module that is distinct from the copy that user code + # gets when it does `import threading`. The same applies to every module + # not in `KEEP_MODULES`. + KEEP_MODULES = frozenset(["atexit", "ddtrace", "asyncio", "concurrent", "typing", "logging", "attr"]) + for m in list(_ for _ in sys.modules if _ not in LOADED_MODULES): + if any(m == _ or m.startswith(_ + ".") for _ in KEEP_MODULES): + continue + + if PY2: + KEEP_MODULES_PY2 = frozenset(["encodings", "codecs"]) + if any(m == _ or m.startswith(_ + ".") for _ in KEEP_MODULES_PY2): + continue + # Store a reference to deleted modules to avoid them being garbage + # collected + _unloaded_modules.append(sys.modules[m]) + + del sys.modules[m] + + # TODO: The better strategy is to identify the core modues in LOADED_MODULES + # that should not be unloaded, and then unload as much as possible. + UNLOAD_MODULES = frozenset(["time"]) + for u in UNLOAD_MODULES: + for m in list(sys.modules): + if m == u or m.startswith(u + "."): + del sys.modules[m] + + try: from ddtrace import tracer @@ -202,19 +177,18 @@ def update_patched_modules(): if not opts: tracer.configure(**opts) - # We need to clean up after we have imported everything we need from - # ddtrace, but before we register the patch-on-import hooks for the - # integrations. This is because registering a hook for a module - # that is already imported causes the module to be patched immediately. - # So if we unload the module after registering hooks, we effectively - # remove the patching, thus breaking the tracer integration. - if will_run_module_cloning: - cleanup_loaded_modules() if trace_enabled: update_patched_modules() from ddtrace import patch_all + # We need to clean up after we have imported everything we need from + # ddtrace, but before we register the patch-on-import hooks for the + # integrations. + cleanup_loaded_modules() + patch_all(**EXTRA_PATCHED_MODULES) + else: + cleanup_loaded_modules() # Only the import of the original sitecustomize.py is allowed after this # point. diff --git a/ddtrace/contrib/gevent/__init__.py b/ddtrace/contrib/gevent/__init__.py index 838fb73073b..70db42fbacf 100644 --- a/ddtrace/contrib/gevent/__init__.py +++ b/ddtrace/contrib/gevent/__init__.py @@ -5,11 +5,9 @@ The integration patches the gevent internals to add context management logic. .. note:: - If :ref:`ddtrace-run` is being used set ``DD_GEVENT_PATCH_ALL=true`` and - ``gevent.monkey.patch_all()`` will be called as early as possible in the application - to avoid patching conflicts. - If ``ddtrace-run`` is not being used then be sure to call ``gevent.monkey.patch_all`` - before importing ``ddtrace`` and calling ``ddtrace.patch`` or ``ddtrace.patch_all``. + If ``ddtrace-run`` is not being used then be sure to ``import ddtrace.auto`` + before calling ``gevent.monkey.patch_all``. + If ``ddtrace-run`` is being used then no additional configuration is required. The integration also configures the global tracer instance to use a gevent diff --git a/ddtrace/contrib/gunicorn/__init__.py b/ddtrace/contrib/gunicorn/__init__.py index e2b598ea175..c0f017c19f1 100644 --- a/ddtrace/contrib/gunicorn/__init__.py +++ b/ddtrace/contrib/gunicorn/__init__.py @@ -1,33 +1,10 @@ """ -**Note:** ``ddtrace-run`` and Python 2 are both not supported with `Gunicorn `__. +ddtrace works with Gunicorn. -``ddtrace`` only supports Gunicorn's ``gevent`` worker type when configured as follows: - -- The application is running under a Python version >=3.6 and <=3.10 -- `ddtrace-run` is not used -- The `DD_GEVENT_PATCH_ALL=1` environment variable is set -- Gunicorn's ```post_fork`` `__ hook does not import from - ``ddtrace`` -- ``import ddtrace.bootstrap.sitecustomize`` is called either in the application's main process or in the - ```post_worker_init`` `__ hook. - -.. code-block:: python - - # gunicorn.conf.py - def post_fork(server, worker): - # don't touch ddtrace here - pass - - def post_worker_init(worker): - import ddtrace.bootstrap.sitecustomize - - workers = 4 - worker_class = "gevent" - bind = "8080" - -.. code-block:: bash - - DD_GEVENT_PATCH_ALL=1 gunicorn --config gunicorn.conf.py path.to.my:app +.. note:: + If you cannot wrap your Gunicorn server with the ``ddtrace-run``command and + it uses ``gevent`` workers, be sure to ``import ddtrace.auto`` as early as + possible in your application's lifecycle. """ diff --git a/ddtrace/internal/forksafe.py b/ddtrace/internal/forksafe.py index c0bdf19eeb5..8f1a2846088 100644 --- a/ddtrace/internal/forksafe.py +++ b/ddtrace/internal/forksafe.py @@ -7,8 +7,6 @@ import typing import weakref -from ddtrace.internal.module import ModuleWatchdog -from ddtrace.internal.utils.formats import asbool from ddtrace.vendor import wrapt @@ -24,26 +22,6 @@ _soft = True -def patch_gevent_hub_reinit(module): - # The gevent hub is re-initialized *after* the after-in-child fork hooks are - # called, so we patch the gevent.hub.reinit function to ensure that the - # fork hooks run again after this further re-initialisation, if it is ever - # called. - from ddtrace.internal.wrapping import wrap - - def wrapped_reinit(f, args, kwargs): - try: - return f(*args, **kwargs) - finally: - ddtrace_after_in_child() - - wrap(module.reinit, wrapped_reinit) - - -if asbool(os.getenv("_DD_TRACE_GEVENT_HUB_PATCHED", default=False)): - ModuleWatchdog.register_module_hook("gevent.hub", patch_gevent_hub_reinit) - - def ddtrace_after_in_child(): # type: () -> None global _registry diff --git a/ddtrace/internal/nogevent.py b/ddtrace/internal/nogevent.py deleted file mode 100644 index e727cf4c3c5..00000000000 --- a/ddtrace/internal/nogevent.py +++ /dev/null @@ -1,122 +0,0 @@ -# -*- encoding: utf-8 -*- -"""This files exposes non-gevent Python original functions.""" -import threading - -import attr -import six - -from ddtrace.internal import compat -from ddtrace.internal import forksafe - - -try: - import gevent.monkey -except ImportError: - - def get_original(module, func): - return getattr(__import__(module), func) - - def is_module_patched(module): - return False - - -else: - get_original = gevent.monkey.get_original - is_module_patched = gevent.monkey.is_module_patched - - -sleep = get_original("time", "sleep") - -try: - # Python ≥ 3.8 - threading_get_native_id = get_original("threading", "get_native_id") -except AttributeError: - threading_get_native_id = None - -start_new_thread = get_original(six.moves._thread.__name__, "start_new_thread") -thread_get_ident = get_original(six.moves._thread.__name__, "get_ident") -Thread = get_original("threading", "Thread") -Lock = get_original("threading", "Lock") - -if six.PY2 and is_module_patched("threading"): - _allocate_lock = get_original("threading", "_allocate_lock") - _threading_RLock = get_original("threading", "_RLock") - _threading_Verbose = get_original("threading", "_Verbose") - - class _RLock(_threading_RLock): - """Patched RLock to ensure threading._allocate_lock is called rather than - gevent.threading._allocate_lock if patching has occurred. This is not - necessary in Python 3 where the RLock function uses the _CRLock so is - unaffected by gevent patching. - """ - - def __init__(self, verbose=None): - # We want to avoid calling the RLock init as it will allocate a gevent lock - # That means we have to reproduce the code from threading._RLock.__init__ here - # https://github.com/python/cpython/blob/8d21aa21f2cbc6d50aab3f420bb23be1d081dac4/Lib/threading.py#L132-L136 - _threading_Verbose.__init__(self, verbose) - self.__block = _allocate_lock() - self.__owner = None - self.__count = 0 - - def RLock(*args, **kwargs): - return _RLock(*args, **kwargs) - - -else: - # We do not patch RLock in Python 3 however for < 3.7 the C implementation of - # RLock might not be available as the _thread module is optional. In that - # case, the Python implementation will be used. This means there is still - # the possibility that RLock in Python 3 will cause problems for gevent with - # ddtrace profiling enabled though it remains an open question when that - # would be the case for the supported platforms. - # https://github.com/python/cpython/blob/c19983125a42a4b4958b11a26ab5e03752c956fc/Lib/threading.py#L38-L41 - # https://github.com/python/cpython/blob/c19983125a42a4b4958b11a26ab5e03752c956fc/Doc/library/_thread.rst#L26-L27 - RLock = get_original("threading", "RLock") - - -is_threading_patched = is_module_patched("threading") - -if is_threading_patched: - - @attr.s - class DoubleLock(object): - """A lock that prevent concurrency from a gevent coroutine and from a threading.Thread at the same time.""" - - # This is a gevent-patched threading.Lock (= a gevent Lock) - _lock = attr.ib(factory=forksafe.Lock, init=False, repr=False) - # This is a unpatched threading.Lock (= a real threading.Lock) - _thread_lock = attr.ib(factory=lambda: forksafe.ResetObject(Lock), init=False, repr=False) - - def acquire(self): - # type: () -> None - # You cannot acquire a gevent-lock from another thread if it has been acquired already: - # make sure we exclude the gevent-lock from being acquire by another thread by using a thread-lock first. - self._thread_lock.acquire() - self._lock.acquire() - - def release(self): - # type: () -> None - self._lock.release() - self._thread_lock.release() - - def __enter__(self): - # type: () -> DoubleLock - self.acquire() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.release() - - -else: - DoubleLock = threading.Lock # type: ignore[misc,assignment] - - -if is_threading_patched: - # NOTE: bold assumption: this module is always imported by the MainThread. - # The python `threading` module makes that assumption and it's beautiful we're going to do the same. - # We don't have the choice has we can't access the original MainThread - main_thread_id = thread_get_ident() -else: - main_thread_id = compat.main_thread.ident diff --git a/ddtrace/internal/periodic.py b/ddtrace/internal/periodic.py index ed0b7dff98f..0bacc36cc81 100644 --- a/ddtrace/internal/periodic.py +++ b/ddtrace/internal/periodic.py @@ -1,12 +1,9 @@ # -*- encoding: utf-8 -*- -import sys import threading -import time import typing import attr -from ddtrace.internal import nogevent from ddtrace.internal import service from . import forksafe @@ -53,54 +50,9 @@ def stop(self): if self.is_alive(): self.quit.set() - def _is_proper_class(self): - """ - Picture this: you're running a gunicorn server under ddtrace-run (`ddtrace-run gunicorn...`). - Profiler._profiler() (a PeriodicThread) is running in the main process. - Gunicorn forks the process and sets up the resulting child process as a "gevent worker". - Because of the Profiler's _restart_on_fork hook, inside the child process, the Profiler is stopped, copied, and - started for the purpose of profiling the child process. - In _restart_on_fork, the Profiler being stopped is the one that was running before the fork, ie the one in the - main process. Copying that Profiler takes place in the child process. Inside the child process, we've now got - *one* process-local Profiler running in a thread, and that's the only Profiler running. - - ...or is it? - - As it turns out, gevent has some of its own post-fork logic that complicates things. All of the above is - accurate, except for the bit about the child process' Profiler being the only one alive. In fact, gunicorn's - "gevent worker" notices that there was a Profiler (PeriodicThread) running before the fork, and attempts to - bring it back to life after the fork. - - Outside of ddtrace-run, the only apparent problem this causes is duplicated work and decreased performance. - Under ddtrace-run, because it triggers an additional fork to which gevent's post-fork logic responds, the - thread ends up being restarted twice in the child process. This means that there are a bunch of instances of - the thread running simultaneously: - - the "correct" one started by ddtrace's _restart_on_fork - the copy of the pre-fork one restarted by gevent after the fork done by gunicorn - the copy of the pre-fork one restarted by gevent after the fork done by ddtrace-run - - This causes even more problems for PeriodicThread uses like the Profiler that rely on running as singletons per - process. - - In these situations where there are many copies of the restarted thread, the copies are conveniently marked as - such by being instances of gevent.threading._DummyThread. - - This function _is_proper_class exists as a thread-local release valve that lets these _DummyThreads stop - themselves, because there's no other sane way to join all _DummyThreads from outside of those threads - themselves. Not doing this causes the _DummyThreads to be orphaned and hang, which can degrade performance - and cause crashes in threads that assume they're singletons. - - That's why it's hard to write a test for - doing so requires waiting for the threads to die on their own, which - can make tests take a really long time. - """ - return isinstance(threading.current_thread(), self.__class__) - def run(self): """Run the target function periodically.""" while not self.quit.wait(self.interval): - if not self._is_proper_class(): - break self._target() if self._on_shutdown is not None: self._on_shutdown() @@ -137,9 +89,6 @@ def awake(self): def run(self): """Run the target function periodically or on demand.""" while not self.quit.is_set(): - if not self._is_proper_class(): - break - self._target() if self.request.wait(self.interval): @@ -150,177 +99,6 @@ def run(self): self._on_shutdown() -class _GeventPeriodicThread(PeriodicThread): - """Periodic thread. - - This class can be used to instantiate a worker thread that will run its `run_periodic` function every `interval` - seconds. - - """ - - # That's the value Python 2 uses in its `threading` module - SLEEP_INTERVAL = 0.005 - - def __init__(self, interval, target, name=None, on_shutdown=None): - """Create a periodic thread. - - :param interval: The interval in seconds to wait between execution of the periodic function. - :param target: The periodic function to execute every interval. - :param name: The name of the thread. - :param on_shutdown: The function to call when the thread shuts down. - """ - super(_GeventPeriodicThread, self).__init__(interval, target, name, on_shutdown) - self._tident = None - self._periodic_started = False - self._periodic_stopped = False - - def _reset_internal_locks(self, is_alive=False): - # Called by Python via `threading._after_fork` - self._periodic_stopped = True - - @property - def ident(self): - return self._tident - - def start(self): - """Start the thread.""" - self.quit = False - if self._tident is not None: - raise RuntimeError("threads can only be started once") - self._tident = nogevent.start_new_thread(self.run, tuple()) - if nogevent.threading_get_native_id: - self._native_id = nogevent.threading_get_native_id() - - # Wait for the thread to be started to avoid race conditions - while not self._periodic_started: - time.sleep(self.SLEEP_INTERVAL) - - def is_alive(self): - return not self._periodic_stopped and self._periodic_started - - def join(self, timeout=None): - # FIXME: handle the timeout argument - while self.is_alive(): - time.sleep(self.SLEEP_INTERVAL) - - def stop(self): - """Stop the thread.""" - self.quit = True - - def run(self): - """Run the target function periodically.""" - # Do not use the threading._active_limbo_lock here because it's a gevent lock - threading._active[self._tident] = self - - self._periodic_started = True - - try: - while self.quit is False: - self._target() - slept = 0 - while self.quit is False and slept < self.interval: - nogevent.sleep(self.SLEEP_INTERVAL) - slept += self.SLEEP_INTERVAL - if self._on_shutdown is not None: - self._on_shutdown() - except Exception: - # Exceptions might happen during interpreter shutdown. - # We're mimicking what `threading.Thread` does in daemon mode, we ignore them. - # See `threading.Thread._bootstrap` for details. - if sys is not None: - raise - finally: - try: - self._periodic_stopped = True - del threading._active[self._tident] - except Exception: - # Exceptions might happen during interpreter shutdown. - # We're mimicking what `threading.Thread` does in daemon mode, we ignore them. - # See `threading.Thread._bootstrap` for details. - if sys is not None: - raise - - -class _GeventAwakeablePeriodicThread(_GeventPeriodicThread): - """Periodic awakeable thread.""" - - def __init__(self, interval, target, name=None, on_shutdown=None): - super(_GeventAwakeablePeriodicThread, self).__init__(interval, target, name, on_shutdown) - self.request = False - self.served = False - self.awake_lock = nogevent.DoubleLock() - - def stop(self): - """Stop the thread.""" - super(_GeventAwakeablePeriodicThread, self).stop() - self.request = True - - def awake(self): - with self.awake_lock: - self.served = False - self.request = True - while not self.served: - nogevent.sleep(self.SLEEP_INTERVAL) - - def run(self): - """Run the target function periodically.""" - # Do not use the threading._active_limbo_lock here because it's a gevent lock - threading._active[self._tident] = self - - self._periodic_started = True - - try: - while not self.quit: - self._target() - - slept = 0 - while self.request is False and slept < self.interval: - nogevent.sleep(self.SLEEP_INTERVAL) - slept += self.SLEEP_INTERVAL - - if self.request: - self.request = False - self.served = True - - if self._on_shutdown is not None: - self._on_shutdown() - except Exception: - # Exceptions might happen during interpreter shutdown. - # We're mimicking what `threading.Thread` does in daemon mode, we ignore them. - # See `threading.Thread._bootstrap` for details. - if sys is not None: - raise - finally: - try: - self._periodic_stopped = True - del threading._active[self._tident] - except Exception: - # Exceptions might happen during interpreter shutdown. - # We're mimicking what `threading.Thread` does in daemon mode, we ignore them. - # See `threading.Thread._bootstrap` for details. - if sys is not None: - raise - - -def PeriodicRealThreadClass(): - # type: () -> typing.Type[PeriodicThread] - """Return a PeriodicThread class based on the underlying thread implementation (native, gevent, etc). - - The returned class works exactly like ``PeriodicThread``, except that it runs on a *real* OS thread. Be aware that - this might be tricky in e.g. the gevent case, where ``Lock`` object must not be shared with the ``MainThread`` - (otherwise it'd dead lock). - - """ - if nogevent.is_module_patched("threading"): - return _GeventPeriodicThread - return PeriodicThread - - -def AwakeablePeriodicRealThreadClass(): - # type: () -> typing.Type[PeriodicThread] - return _GeventAwakeablePeriodicThread if nogevent.is_module_patched("threading") else AwakeablePeriodicThread - - @attr.s(eq=False) class PeriodicService(service.Service): """A service that runs periodically.""" @@ -328,10 +106,7 @@ class PeriodicService(service.Service): _interval = attr.ib(type=float) _worker = attr.ib(default=None, init=False, repr=False) - _real_thread = False - "Class variable to override if the service should run in a real OS thread." - - __thread_class__ = (PeriodicRealThreadClass, PeriodicThread) + __thread_class__ = PeriodicThread @property def interval(self): @@ -348,16 +123,10 @@ def interval( if self._worker: self._worker.interval = value - def _start_service( - self, - *args, # type: typing.Any - **kwargs # type: typing.Any - ): - # type: (...) -> None + def _start_service(self, *args, **kwargs): + # type: (typing.Any, typing.Any) -> None """Start the periodic service.""" - real_class, python_class = self.__thread_class__ - periodic_thread_class = real_class() if self._real_thread else python_class - self._worker = periodic_thread_class( + self._worker = self.__thread_class__( self.interval, target=self.periodic, name="%s:%s" % (self.__class__.__module__, self.__class__.__name__), @@ -365,12 +134,8 @@ def _start_service( ) self._worker.start() - def _stop_service( - self, - *args, # type: typing.Any - **kwargs # type: typing.Any - ): - # type: (...) -> None + def _stop_service(self, *args, **kwargs): + # type: (typing.Any, typing.Any) -> None """Stop the periodic collector.""" self._worker.stop() super(PeriodicService, self)._stop_service(*args, **kwargs) @@ -394,7 +159,7 @@ def periodic(self): class AwakeablePeriodicService(PeriodicService): """A service that runs periodically but that can also be awakened on demand.""" - __thread_class__ = (AwakeablePeriodicRealThreadClass, AwakeablePeriodicThread) + __thread_class__ = AwakeablePeriodicThread def awake(self): # type: (...) -> None diff --git a/ddtrace/profiling/_threading.pyx b/ddtrace/profiling/_threading.pyx index 6035032694b..52fb5a543f5 100644 --- a/ddtrace/profiling/_threading.pyx +++ b/ddtrace/profiling/_threading.pyx @@ -1,52 +1,64 @@ from __future__ import absolute_import -import threading +import sys +import threading as ddtrace_threading import typing import weakref import attr - -from ddtrace.internal import nogevent +from six.moves import _thread cpdef get_thread_name(thread_id): - # This is a special case for gevent: - # When monkey patching, gevent replaces all active threads by their greenlet equivalent. - # This means there's no chance to find the MainThread in the list of _active threads. - # Therefore we special case the MainThread that way. - # If native threads are started using gevent.threading, they will be inserted in threading._active - # so we will find them normally. - if thread_id == nogevent.main_thread_id: - return "MainThread" - - # We don't want to bother to lock anything here, especially with eventlet involved 😓. We make a best effort to - # get the thread name; if we fail, it'll just be an anonymous thread because it's either starting or dying. - try: - return threading._active[thread_id].name - except KeyError: + # Do not force-load the threading module if it's not already loaded + if "threading" not in sys.modules: + return None + + import threading + + # Look for all threads, including the ones we create + for threading_mod in (threading, ddtrace_threading): + # We don't want to bother to lock anything here, especially with + # eventlet involved 😓. We make a best effort to get the thread name; if + # we fail, it'll just be an anonymous thread because it's either + # starting or dying. try: - return threading._limbo[thread_id].name + return threading_mod._active[thread_id].name except KeyError: - return None + try: + return threading_mod._limbo[thread_id].name + except KeyError: + pass + + return None cpdef get_thread_native_id(thread_id): + # Do not force-load the threading module if it's not already loaded + if "threading" not in sys.modules: + return None + + import threading + try: thread_obj = threading._active[thread_id] except KeyError: - # This should not happen, unless somebody started a thread without - # using the `threading` module. - # In that case, well… just use the thread_id as native_id 🤞 - return thread_id - else: + try: + thread_obj = ddtrace_threading._active[thread_id] + except KeyError: + # This should not happen, unless somebody started a thread without + # using the `threading` module. + # In that case, well… just use the thread_id as native_id 🤞 + return thread_id + + try: # We prioritize using native ids since we expect them to be surely unique for a program. This is less true # for hashes since they are relative to the memory address which can easily be the same across different # objects. - try: - return thread_obj.native_id - except AttributeError: - # Python < 3.8 - return hash(thread_obj) + return thread_obj.native_id + except AttributeError: + # Python < 3.8 + return hash(thread_obj) # cython does not play well with mypy @@ -76,7 +88,7 @@ class _ThreadLink(_thread_link_base): ): # type: (...) -> None """Link an object to the current running thread.""" - self._thread_id_to_object[nogevent.thread_get_ident()] = weakref.ref(obj) + self._thread_id_to_object[_thread.get_ident()] = weakref.ref(obj) def clear_threads(self, existing_thread_ids, # type: typing.Set[int] diff --git a/ddtrace/profiling/collector/_lock.py b/ddtrace/profiling/collector/_lock.py index 14059adc48b..7cd8aed487c 100644 --- a/ddtrace/profiling/collector/_lock.py +++ b/ddtrace/profiling/collector/_lock.py @@ -6,9 +6,9 @@ import typing import attr +from six.moves import _thread from ddtrace.internal import compat -from ddtrace.internal import nogevent from ddtrace.internal.utils import attr as attr_utils from ddtrace.internal.utils import formats from ddtrace.profiling import _threading @@ -43,7 +43,7 @@ class LockReleaseEvent(LockEventBase): def _current_thread(): # type: (...) -> typing.Tuple[int, str] - thread_id = nogevent.thread_get_ident() + thread_id = _thread.get_ident() return thread_id, _threading.get_thread_name(thread_id) @@ -122,12 +122,8 @@ def acquire(self, *args, **kwargs): except Exception: pass - def release( - self, - *args, # type: typing.Any - **kwargs # type: typing.Any - ): - # type: (...) -> None + def release(self, *args, **kwargs): + # type (typing.Any, typing.Any) -> None try: return self.__wrapped__.release(*args, **kwargs) finally: @@ -145,7 +141,7 @@ def release( frames, nframes = _traceback.pyframe_to_frames(frame, self._self_max_nframes) - event = self.RELEASE_EVENT_CLASS( # type: ignore[call-arg] + event = self.RELEASE_EVENT_CLASS( lock_name=self._self_name, frames=frames, nframes=nframes, diff --git a/ddtrace/profiling/collector/_task.pyx b/ddtrace/profiling/collector/_task.pyx index 5caabe5b6bd..07e001c5b57 100644 --- a/ddtrace/profiling/collector/_task.pyx +++ b/ddtrace/profiling/collector/_task.pyx @@ -1,25 +1,35 @@ +import sys import weakref from ddtrace.internal import compat -from ddtrace.internal import nogevent +from ddtrace.vendor.wrapt.importer import when_imported from .. import _asyncio from .. import _threading -try: - import gevent.hub - import gevent.thread - from greenlet import getcurrent - from greenlet import greenlet - from greenlet import settrace -except ImportError: - _gevent_tracer = None -else: +_gevent_tracer = None + + +@when_imported("gevent") +def install_greenlet_tracer(gevent): + global _gevent_tracer + + try: + import gevent.hub + import gevent.thread + from greenlet import getcurrent + from greenlet import greenlet + from greenlet import settrace + except ImportError: + # We don't seem to have the required dependencies. + return class DDGreenletTracer(object): - def __init__(self): + def __init__(self, gevent): # type: (...) -> None + self.gevent = gevent + self.previous_trace_function = settrace(self) self.greenlets = weakref.WeakValueDictionary() self.active_greenlet = getcurrent() @@ -44,9 +54,7 @@ else: if self.previous_trace_function is not None: self.previous_trace_function(event, args) - # NOTE: bold assumption: this module is always imported by the MainThread. - # A GreenletTracer is local to the thread instantiating it and we assume this is run by the MainThread. - _gevent_tracer = DDGreenletTracer() + _gevent_tracer = DDGreenletTracer(gevent) cdef _asyncio_task_get_frame(task): @@ -83,8 +91,9 @@ cpdef get_task(thread_id): # gevent greenlet support: # - we only support tracing tasks in the greenlets run in the MainThread. # - if both gevent and asyncio are in use (!) we only return asyncio - if task_id is None and thread_id == nogevent.main_thread_id and _gevent_tracer is not None: - task_id = gevent.thread.get_ident(_gevent_tracer.active_greenlet) + if task_id is None and _gevent_tracer is not None: + gevent_thread = _gevent_tracer.gevent.thread + task_id = gevent_thread.get_ident(_gevent_tracer.active_greenlet) # Greenlets might be started as Thread in gevent task_name = _threading.get_thread_name(task_id) frame = _gevent_tracer.active_greenlet.gr_frame @@ -105,7 +114,7 @@ cpdef list_tasks(thread_id): # We consider all Thread objects to be greenlet # This should be true as nobody could use a half-monkey-patched gevent - if thread_id == nogevent.main_thread_id and _gevent_tracer is not None: + if _gevent_tracer is not None: tasks.extend([ (greenlet_id, _threading.get_thread_name(greenlet_id), diff --git a/ddtrace/profiling/collector/stack.pyx b/ddtrace/profiling/collector/stack.pyx index f9bb099062b..d5cb6ee866b 100644 --- a/ddtrace/profiling/collector/stack.pyx +++ b/ddtrace/profiling/collector/stack.pyx @@ -2,7 +2,7 @@ from __future__ import absolute_import import sys -import threading +import threading as ddtrace_threading # this is ddtrace's internal copy of the module, not the application's copy import typing import attr @@ -20,10 +20,6 @@ from ddtrace.profiling.collector import _traceback from ddtrace.profiling.collector import stack_event -# NOTE: Do not use LOG here. This code runs under a real OS thread and is unable to acquire any lock of the `logging` -# module without having gevent crashing our dedicated thread. - - # These are special features that might not be available depending on your Python version and platform FEATURES = { "cpu-time": False, @@ -296,14 +292,12 @@ cdef collect_threads(thread_id_ignore_list, thread_time, thread_span_links) with cdef stack_collect(ignore_profiler, thread_time, max_nframes, interval, wall_time, thread_span_links, collect_endpoint): - - if ignore_profiler: - # Do not use `threading.enumerate` to not mess with locking (gevent!) - thread_id_ignore_list = {thread_id - for thread_id, thread in threading._active.items() - if getattr(thread, "_ddtrace_profiling_ignore", False)} - else: - thread_id_ignore_list = set() + # Do not use `threading.enumerate` to not mess with locking (gevent!) + thread_id_ignore_list = { + thread_id + for thread_id, thread in ddtrace_threading._active.items() + if getattr(thread, "_ddtrace_profiling_ignore", False) + } if ignore_profiler else set() running_threads = collect_threads(thread_id_ignore_list, thread_time, thread_span_links) @@ -317,12 +311,6 @@ cdef stack_collect(ignore_profiler, thread_time, max_nframes, interval, wall_tim for thread_id, thread_native_id, thread_name, thread_pyframes, exception, span, cpu_time in running_threads: thread_task_id, thread_task_name, thread_task_frame = _task.get_task(thread_id) - # When gevent thread monkey-patching is enabled, our PeriodicCollector non-real-threads are gevent tasks. - # Therefore, they run in the main thread and their samples are collected by `collect_threads`. - # We ignore them here: - if thread_task_id in thread_id_ignore_list: - continue - tasks = _task.list_tasks(thread_id) # Inject wall time for all running tasks diff --git a/ddtrace/profiling/recorder.py b/ddtrace/profiling/recorder.py index 3bfc5cb46f0..e94a9a01077 100644 --- a/ddtrace/profiling/recorder.py +++ b/ddtrace/profiling/recorder.py @@ -1,11 +1,11 @@ # -*- encoding: utf-8 -*- import collections +import threading import typing import attr from ddtrace.internal import forksafe -from ddtrace.internal import nogevent from . import event @@ -39,7 +39,7 @@ class Recorder(object): """A dict of {event_type_class: max events} to limit the number of events to record.""" events = attr.ib(init=False, repr=False, eq=False, type=EventsType) - _events_lock = attr.ib(init=False, repr=False, factory=nogevent.DoubleLock, eq=False) + _events_lock = attr.ib(init=False, repr=False, factory=threading.Lock, eq=False) def __attrs_post_init__(self): # type: (...) -> None diff --git a/ddtrace_gevent_check.py b/ddtrace_gevent_check.py deleted file mode 100644 index cfbe5ce77e7..00000000000 --- a/ddtrace_gevent_check.py +++ /dev/null @@ -1,13 +0,0 @@ -import sys -import warnings - - -def gevent_patch_all(event): - if "ddtrace" in sys.modules: - warnings.warn( - "Loading ddtrace before using gevent monkey patching is not supported " - "and is likely to break the application. " - "Use `DD_GEVENT_PATCH_ALL=true ddtrace-run` to fix this or " - "import `ddtrace` after `gevent.monkey.patch_all()` has been called.", - RuntimeWarning, - ) diff --git a/releasenotes/notes/nogevent-6f2892cb412f987f.yaml b/releasenotes/notes/nogevent-6f2892cb412f987f.yaml new file mode 100644 index 00000000000..eb73c01232a --- /dev/null +++ b/releasenotes/notes/nogevent-6f2892cb412f987f.yaml @@ -0,0 +1,9 @@ +--- +deprecations: + - | + gevent: ``DD_GEVENT_PATCH_ALL`` is deprecated and will be removed in the next major version. Gevent compatibility is now automatic + and does not require extra configuration when running with ``ddtrace-run``. If not using ``ddtrace-run``, please import ``ddtrace.auto`` before calling ``gevent.monkey.patch_all()``. +fixes: + - | + gevent: This fix resolves an incompatibility between ddtrace and gevent that caused threads to hang in certain configurations, for example + the profiler running in a gunicorn application's gevent worker process. diff --git a/riotfile.py b/riotfile.py index 63ffc614b31..a1fa689765e 100644 --- a/riotfile.py +++ b/riotfile.py @@ -2607,8 +2607,12 @@ def select_pys(min_version=MIN_PYTHON_VERSION, max_version=MAX_PYTHON_VERSION): pkgs={"requests": latest, "gevent": latest}, venvs=[ Venv( - pys=select_pys(min_version="3.8"), - pkgs={"gunicorn": ["==19.10.0", "==20.0.4", latest]}, + pys=select_pys(min_version="3.5"), + pkgs={"gunicorn": ["==19.10.0", latest]}, + ), + Venv( + pys="2.7", + pkgs={"gunicorn": ["==19.10.0"]}, ), ], ), diff --git a/setup.py b/setup.py index 90e7c15a332..5fca9edb42e 100644 --- a/setup.py +++ b/setup.py @@ -342,7 +342,6 @@ def get_exts_for(name): "ddtrace.appsec": ["rules.json"], "ddtrace.appsec.ddwaf": [os.path.join("libddwaf", "*", "lib", "libddwaf.*")], }, - py_modules=["ddtrace_gevent_check"], python_requires=">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*", zip_safe=False, # enum34 is an enum backport for earlier versions of python @@ -393,9 +392,6 @@ def get_exts_for(name): "ddtrace = ddtrace.contrib.pytest.plugin", "ddtrace.pytest_bdd = ddtrace.contrib.pytest_bdd.plugin", ], - "gevent.plugins.monkey.did_patch_all": [ - "ddtrace_gevent_check = ddtrace_gevent_check:gevent_patch_all", - ], }, classifiers=[ "Programming Language :: Python", diff --git a/tests/commands/test_runner.py b/tests/commands/test_runner.py index f9ea896e705..ccc8d43b029 100644 --- a/tests/commands/test_runner.py +++ b/tests/commands/test_runner.py @@ -253,16 +253,7 @@ def test_logs_injection(self): """Ensure logs injection works""" with self.override_env(dict(DD_LOGS_INJECTION="true", DD_CALL_BASIC_CONFIG="true")): out = subprocess.check_output(["ddtrace-run", "python", "tests/commands/ddtrace_run_logs_injection.py"]) - assert out.startswith(b"Test success") - - def test_gevent_patch_all(self): - with self.override_env(dict(DD_GEVENT_PATCH_ALL="true")): - out = subprocess.check_output(["ddtrace-run", "python", "tests/commands/ddtrace_run_gevent.py"]) - assert out.startswith(b"Test success") - - with self.override_env(dict(DD_GEVENT_PATCH_ALL="1")): - out = subprocess.check_output(["ddtrace-run", "python", "tests/commands/ddtrace_run_gevent.py"]) - assert out.startswith(b"Test success") + assert out.startswith(b"Test success"), out.decode() def test_debug_mode(self): with self.override_env(dict(DD_CALL_BASIC_CONFIG="true")): diff --git a/tests/contrib/gevent/test_monkeypatch.py b/tests/contrib/gevent/test_monkeypatch.py index bc1ea277ab9..a64f4d59380 100644 --- a/tests/contrib/gevent/test_monkeypatch.py +++ b/tests/contrib/gevent/test_monkeypatch.py @@ -17,7 +17,7 @@ def test_gevent_warning(monkeypatch): ) assert subp.wait() == 0 assert subp.stdout.read() == b"" - assert b"RuntimeWarning: Loading ddtrace before using gevent monkey patching" in subp.stderr.read() + assert subp.stderr.read() == b"" @pytest.mark.subprocess diff --git a/tests/contrib/gunicorn/post_fork.py b/tests/contrib/gunicorn/post_fork.py deleted file mode 100644 index a39037eb3ed..00000000000 --- a/tests/contrib/gunicorn/post_fork.py +++ /dev/null @@ -1,19 +0,0 @@ -import sys - -import gevent.monkey - -from ddtrace.debugging import DynamicInstrumentation -from ddtrace.internal.remoteconfig import RemoteConfig - - -# take some notes about the relative ordering of thread creation and -# monkeypatching -monkeypatch_happened = gevent.monkey.is_module_patched("threading") - -# enabling DI here allows test cases to exercise the code paths that handle -# gevent monkeypatching of running threads -# post_fork is called before gevent.monkey.patch_all() -if sys.version_info < (3, 11): - DynamicInstrumentation.enable() - -RemoteConfig._was_enabled_after_gevent_monkeypatch = monkeypatch_happened diff --git a/tests/contrib/gunicorn/test_gunicorn.py b/tests/contrib/gunicorn/test_gunicorn.py index 5b493658621..cc6a89f8451 100644 --- a/tests/contrib/gunicorn/test_gunicorn.py +++ b/tests/contrib/gunicorn/test_gunicorn.py @@ -11,13 +11,11 @@ import pytest import tenacity -from ddtrace.internal.compat import stringify +from ddtrace.internal import compat from tests.webclient import Client SERVICE_INTERVAL = 1 -# this is the most direct manifestation i can find of a bug caused by misconfigured gunicorn+ddtrace -MOST_DIRECT_KNOWN_GUNICORN_RELATED_PROFILER_ERROR_SIGNAL = b"RuntimeError: the memalloc module is already started" GunicornServerSettings = NamedTuple( @@ -30,20 +28,12 @@ ("worker_class", str), ("bind", str), ("use_ddtracerun", bool), - ("import_sitecustomize_in_postworkerinit", bool), - ("start_service_in_hook_named", str), + ("import_auto_in_postworkerinit", bool), ], ) -IMPORT_SITECUSTOMIZE = "import ddtrace.bootstrap.sitecustomize" -with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), "post_fork.py"), "r") as f: - code = f.readlines() -START_SERVICE = " " + " ".join(code) - - -def assert_no_profiler_error(server_process): - assert MOST_DIRECT_KNOWN_GUNICORN_RELATED_PROFILER_ERROR_SIGNAL not in server_process.stderr.read() +IMPORT_AUTO = "import ddtrace.auto" def parse_payload(data): @@ -53,17 +43,6 @@ def parse_payload(data): return json.loads(decoded) -def assert_remoteconfig_started_successfully(response, check_patch=True): - # ddtrace and gunicorn don't play nicely under python 3.5 or 3.11 - if sys.version_info[1] in (5, 11): - return - assert response.status_code == 200 - payload = parse_payload(response.content) - assert payload["remoteconfig"]["worker_alive"] is True - if check_patch: - assert payload["remoteconfig"]["enabled_after_gevent_monkeypatch"] is True - - def _gunicorn_settings_factory( env=None, # type: Dict[str, str] directory=os.getcwd(), # type: str @@ -72,22 +51,19 @@ def _gunicorn_settings_factory( worker_class="sync", # type: str bind="0.0.0.0:8080", # type: str use_ddtracerun=True, # type: bool - import_sitecustomize_in_postworkerinit=False, # type: bool - patch_gevent=None, # type: Optional[bool] - import_sitecustomize_in_app=None, # type: Optional[bool] - start_service_in_hook_named="post_fork", # type: str + import_auto_in_postworkerinit=False, # type: bool + import_auto_in_app=None, # type: Optional[bool] enable_module_cloning=False, # type: bool ): # type: (...) -> GunicornServerSettings """Factory for creating gunicorn settings with simple defaults if settings are not defined.""" if env is None: env = os.environ.copy() - if patch_gevent is not None: - env["DD_GEVENT_PATCH_ALL"] = str(patch_gevent) - if import_sitecustomize_in_app is not None: - env["_DD_TEST_IMPORT_SITECUSTOMIZE"] = str(import_sitecustomize_in_app) + if import_auto_in_app is not None: + env["_DD_TEST_IMPORT_AUTO"] = str(import_auto_in_app) env["DD_UNLOAD_MODULES_FROM_SITECUSTOMIZE"] = "1" if enable_module_cloning else "0" - env["DD_REMOTECONFIG_POLL_SECONDS"] = str(SERVICE_INTERVAL) + env["DD_REMOTE_CONFIGURATION_ENABLED"] = str(True) + env["DD_REMOTECONFIG_POLL_INTERVAL_SECONDS"] = str(SERVICE_INTERVAL) env["DD_PROFILING_UPLOAD_INTERVAL"] = str(SERVICE_INTERVAL) return GunicornServerSettings( env=env, @@ -97,24 +73,15 @@ def _gunicorn_settings_factory( worker_class=worker_class, bind=bind, use_ddtracerun=use_ddtracerun, - import_sitecustomize_in_postworkerinit=import_sitecustomize_in_postworkerinit, - start_service_in_hook_named=start_service_in_hook_named, + import_auto_in_postworkerinit=import_auto_in_postworkerinit, ) def build_config_file(gunicorn_server_settings): - post_fork = START_SERVICE if gunicorn_server_settings.start_service_in_hook_named != "post_worker_init" else "" - post_worker_init = " {sitecustomize}\n{service_start}".format( - sitecustomize=IMPORT_SITECUSTOMIZE if gunicorn_server_settings.import_sitecustomize_in_postworkerinit else "", - service_start=START_SERVICE - if gunicorn_server_settings.start_service_in_hook_named == "post_worker_init" - else "", + post_worker_init = " {}".format( + IMPORT_AUTO if gunicorn_server_settings.import_auto_in_postworkerinit else "", ) cfg = """ -def post_fork(server, worker): - pass -{post_fork} - def post_worker_init(worker): pass {post_worker_init} @@ -123,7 +90,6 @@ def post_worker_init(worker): worker_class = "{worker_class}" bind = "{bind}" """.format( - post_fork=post_fork, post_worker_init=post_worker_init, bind=gunicorn_server_settings.bind, num_workers=gunicorn_server_settings.num_workers, @@ -136,7 +102,7 @@ def post_worker_init(worker): def gunicorn_server(gunicorn_server_settings, tmp_path): cfg_file = tmp_path / "gunicorn.conf.py" cfg = build_config_file(gunicorn_server_settings) - cfg_file.write_text(stringify(cfg)) + cfg_file.write_text(compat.stringify(cfg)) cmd = [] if gunicorn_server_settings.use_ddtracerun: cmd = ["ddtrace-run"] @@ -147,17 +113,19 @@ def gunicorn_server(gunicorn_server_settings, tmp_path): cmd, env=gunicorn_server_settings.env, cwd=gunicorn_server_settings.directory, - stderr=subprocess.PIPE, + stdout=sys.stdout, + stderr=sys.stderr, close_fds=True, preexec_fn=os.setsid, ) try: client = Client("http://%s" % gunicorn_server_settings.bind) try: + print("Waiting for server to start") client.wait(max_tries=100, delay=0.1) + print("Server started") except tenacity.RetryError: raise TimeoutError("Server failed to start, see stdout and stderr logs") - # wait for services to wake up and decide whether to self-destruct due to PeriodicThread._is_proper_class time.sleep(SERVICE_INTERVAL) yield server_process, client try: @@ -169,54 +137,35 @@ def gunicorn_server(gunicorn_server_settings, tmp_path): server_process.wait() -SETTINGS_GEVENT_DDTRACERUN_MODULE_CLONE = _gunicorn_settings_factory( - worker_class="gevent", patch_gevent=False, enable_module_cloning=True +SETTINGS_GEVENT_DDTRACERUN_MODULE_CLONE = _gunicorn_settings_factory(worker_class="gevent", enable_module_cloning=True) +SETTINGS_GEVENT_DDTRACERUN = _gunicorn_settings_factory( + worker_class="gevent", ) -SETTINGS_GEVENT_DDTRACERUN_PATCH = _gunicorn_settings_factory(worker_class="gevent", patch_gevent=True) -SETTINGS_GEVENT_APPIMPORT_PATCH_POSTWORKERSERVICE = _gunicorn_settings_factory( +SETTINGS_GEVENT_APPIMPORT = _gunicorn_settings_factory( worker_class="gevent", use_ddtracerun=False, - import_sitecustomize_in_app=True, - patch_gevent=True, - start_service_in_hook_named="post_worker_init", + import_auto_in_app=True, ) -SETTINGS_GEVENT_POSTWORKERIMPORT_PATCH_POSTWORKERSERVICE = _gunicorn_settings_factory( +SETTINGS_GEVENT_POSTWORKERIMPORT = _gunicorn_settings_factory( worker_class="gevent", use_ddtracerun=False, - import_sitecustomize_in_postworkerinit=True, - patch_gevent=True, - start_service_in_hook_named="post_worker_init", + import_auto_in_postworkerinit=True, ) -@pytest.mark.skipif(sys.version_info > (3, 10), reason="Gunicorn is only supported up to 3.10") +@pytest.mark.skipif(sys.version_info >= (3, 11), reason="Gunicorn is only supported up to 3.10") @pytest.mark.parametrize( "gunicorn_server_settings", [ - SETTINGS_GEVENT_APPIMPORT_PATCH_POSTWORKERSERVICE, - SETTINGS_GEVENT_POSTWORKERIMPORT_PATCH_POSTWORKERSERVICE, + SETTINGS_GEVENT_APPIMPORT, + SETTINGS_GEVENT_POSTWORKERIMPORT, SETTINGS_GEVENT_DDTRACERUN_MODULE_CLONE, ], ) def test_no_known_errors_occur(gunicorn_server_settings, tmp_path): with gunicorn_server(gunicorn_server_settings, tmp_path) as context: - server_process, client = context - r = client.get("/") - assert_no_profiler_error(server_process) - assert_remoteconfig_started_successfully(r, gunicorn_server_settings.env["DD_GEVENT_PATCH_ALL"] == "True") - - -@pytest.mark.parametrize( - "gunicorn_server_settings", - [ - SETTINGS_GEVENT_DDTRACERUN_PATCH, - ], -) -def test_profiler_error_occurs_under_gevent_worker(gunicorn_server_settings, tmp_path): - with gunicorn_server(gunicorn_server_settings, tmp_path) as context: - server_process, client = context - r = client.get("/") - # this particular error does not manifest in 3.8 and older - if sys.version_info >= (3, 9): - assert MOST_DIRECT_KNOWN_GUNICORN_RELATED_PROFILER_ERROR_SIGNAL in server_process.stderr.read() - assert_remoteconfig_started_successfully(r) + _, client = context + response = client.get("/") + assert response.status_code == 200 + payload = parse_payload(response.content) + assert payload["profiler"]["is_active"] is True diff --git a/tests/contrib/gunicorn/wsgi_mw_app.py b/tests/contrib/gunicorn/wsgi_mw_app.py index 98898349836..34199f570ef 100644 --- a/tests/contrib/gunicorn/wsgi_mw_app.py +++ b/tests/contrib/gunicorn/wsgi_mw_app.py @@ -4,18 +4,16 @@ """ import json import os -import sys from ddtrace import tracer from ddtrace.contrib.wsgi import DDWSGIMiddleware -from ddtrace.internal.remoteconfig import RemoteConfig from ddtrace.profiling import bootstrap import ddtrace.profiling.auto # noqa from tests.webclient import PingFilter -if os.getenv("_DD_TEST_IMPORT_SITECUSTOMIZE"): - import ddtrace.bootstrap.sitecustomize # noqa: F401 # isort: skip +if os.getenv("_DD_TEST_IMPORT_AUTO"): + import ddtrace.auto # noqa: F401 # isort: skip tracer.configure( settings={ @@ -23,14 +21,12 @@ } ) -if sys.version_info < (3, 11): - from ddtrace.debugging import DynamicInstrumentation +SCHEDULER_SENTINEL = -1 +assert bootstrap.profiler._scheduler._last_export not in (None, SCHEDULER_SENTINEL) +bootstrap.profiler._scheduler._last_export = SCHEDULER_SENTINEL def aggressive_shutdown(): - RemoteConfig.disable() - if sys.version_info < (3, 11): - DynamicInstrumentation.disable() tracer.shutdown(timeout=1) if hasattr(bootstrap, "profiler"): bootstrap.profiler._scheduler.stop() @@ -40,17 +36,12 @@ def aggressive_shutdown(): def simple_app(environ, start_response): if environ["RAW_URI"] == "/shutdown": aggressive_shutdown() - data = bytes("goodbye", encoding="utf-8") + data = b"goodbye" else: - has_config_worker = hasattr(RemoteConfig._worker, "_worker") payload = { - "remoteconfig": { - "worker_alive": has_config_worker and RemoteConfig._worker._worker.is_alive(), - "enabled_after_gevent_monkeypatch": RemoteConfig._was_enabled_after_gevent_monkeypatch, - }, + "profiler": {"is_active": bootstrap.profiler._scheduler._last_export != SCHEDULER_SENTINEL}, } - json_payload = json.dumps(payload) - data = bytes(json_payload, encoding="utf-8") + data = json.dumps(payload).encode("utf-8") start_response("200 OK", [("Content-Type", "text/plain"), ("Content-Length", str(len(data)))]) return iter([data]) diff --git a/tests/internal/test_forksafe.py b/tests/internal/test_forksafe.py index 08f78fbd1d7..a168b2fb966 100644 --- a/tests/internal/test_forksafe.py +++ b/tests/internal/test_forksafe.py @@ -1,5 +1,5 @@ +from collections import Counter import os -import sys import pytest import six @@ -288,61 +288,72 @@ def fn(): assert exit_code == 42 -# FIXME: subprocess marks do not respect pytest.mark.skips -if sys.version_info < (3, 11, 0): +@pytest.mark.subprocess( + out=lambda _: Counter(_) == {"C": 3, "T": 4}, + err=None, + ddtrace_run=True, +) +def test_gevent_gunicorn_behaviour(): + # emulate how sitecustomize.py cleans up imported modules + # to avoid problems with threads/forks that we saw previously + # when running gunicorn with gevent workers - @pytest.mark.subprocess( - out=("CTCTCT" if sys.platform == "darwin" or (3,) < sys.version_info < (3, 7) else "CCCTTT"), - err=None, - env=dict(_DD_TRACE_GEVENT_HUB_PATCHED="true"), - ) - def test_gevent_reinit_patch(): - import os - import sys + import sys - from ddtrace.internal import forksafe - from ddtrace.internal.periodic import PeriodicService + assert "gevent" not in sys.modules - class TestService(PeriodicService): - def __init__(self): - super(TestService, self).__init__(interval=1.0) + assert "ddtrace.internal" in sys.modules + assert "ddtrace.internal.periodic" in sys.modules - def periodic(self): - sys.stdout.write("T") + import atexit - service = TestService() - service.start() + from ddtrace.internal import forksafe + from ddtrace.internal.periodic import PeriodicService - def restart_service(): - global service + class TestService(PeriodicService): + def __init__(self): + super(TestService, self).__init__(interval=1.0) - service.stop() - service = TestService() - service.start() + def periodic(self): + sys.stdout.write("T") + self.stop() - forksafe.register(restart_service) + service = TestService() + service.start() + atexit.register(service.stop) - import gevent # noqa + def restart_service(): + global service + service.stop() + service = TestService() + service.start() - def run_child(): - global service + forksafe.register(restart_service) + atexit.register(lambda: service.join(1)) - # We mimic what gunicorn does in child processes - gevent.monkey.patch_all() - gevent.hub.reinit() + # ---- Application code ---- - sys.stdout.write("C") + import os # noqa + import sys # noqa - gevent.sleep(1.5) + import gevent.hub # noqa + import gevent.monkey # noqa - service.stop() + def run_child(): + # We mimic what gunicorn does in child processes + gevent.monkey.patch_all() + gevent.hub.reinit() - def fork_workers(num): - for _ in range(num): - if os.fork() == 0: - run_child() - sys.exit(0) + sys.stdout.write("C") - fork_workers(3) + gevent.sleep(1.5) - service.stop() + def fork_workers(num): + for _ in range(num): + if os.fork() == 0: + run_child() + sys.exit(0) + + fork_workers(3) + + exit() diff --git a/tests/profiling/collector/test_asyncio.py b/tests/profiling/collector/test_asyncio.py index 00eca3e23bd..d7ed9ab9e86 100644 --- a/tests/profiling/collector/test_asyncio.py +++ b/tests/profiling/collector/test_asyncio.py @@ -2,8 +2,8 @@ import uuid import pytest +from six.moves import _thread -from ddtrace.internal import nogevent from ddtrace.profiling import recorder from ddtrace.profiling.collector import asyncio as collector_asyncio @@ -19,7 +19,7 @@ async def test_lock_acquire_events(): assert len(r.events[collector_asyncio.AsyncioLockReleaseEvent]) == 0 event = r.events[collector_asyncio.AsyncioLockAcquireEvent][0] assert event.lock_name == "test_asyncio.py:15" - assert event.thread_id == nogevent.thread_get_ident() + assert event.thread_id == _thread.get_ident() assert event.wait_time_ns >= 0 # It's called through pytest so I'm sure it's gonna be that long, right? assert len(event.frames) > 3 @@ -40,7 +40,7 @@ async def test_asyncio_lock_release_events(): assert len(r.events[collector_asyncio.AsyncioLockReleaseEvent]) == 1 event = r.events[collector_asyncio.AsyncioLockReleaseEvent][0] assert event.lock_name == "test_asyncio.py:35" - assert event.thread_id == nogevent.thread_get_ident() + assert event.thread_id == _thread.get_ident() assert event.locked_for_ns >= 0 # It's called through pytest so I'm sure it's gonna be that long, right? assert len(event.frames) > 3 diff --git a/tests/profiling/collector/test_memalloc.py b/tests/profiling/collector/test_memalloc.py index e77ebea0466..2fc04ce819d 100644 --- a/tests/profiling/collector/test_memalloc.py +++ b/tests/profiling/collector/test_memalloc.py @@ -11,14 +11,10 @@ except ImportError: pytestmark = pytest.mark.skip("_memalloc not available") -from ddtrace.internal import nogevent from ddtrace.profiling import recorder from ddtrace.profiling.collector import memalloc -TESTING_GEVENT = os.getenv("DD_PROFILE_TEST_GEVENT", False) - - def test_start_twice(): _memalloc.start(64, 1000, 512) with pytest.raises(RuntimeError): @@ -51,14 +47,13 @@ def test_start_stop(): _memalloc.stop() -# This is used by tests and must be equal to the line number where object() is called in _allocate_1k 😉 -_ALLOC_LINE_NUMBER = 59 - - def _allocate_1k(): return [object() for _ in range(1000)] +_ALLOC_LINE_NUMBER = _allocate_1k.__code__.co_firstlineno + 1 + + def _pre_allocate_1k(): return _allocate_1k() @@ -81,7 +76,7 @@ def test_iter_events(): last_call = stack[0] assert size >= 1 # size depends on the object size if last_call[2] == "" and last_call[1] == _ALLOC_LINE_NUMBER: - assert thread_id == nogevent.main_thread_id + assert thread_id == threading.main_thread().ident assert last_call[0] == __file__ assert stack[1][0] == __file__ assert stack[1][1] == _ALLOC_LINE_NUMBER @@ -132,7 +127,7 @@ def test_iter_events_multi_thread(): assert size >= 1 # size depends on the object size if last_call[2] == "" and last_call[1] == _ALLOC_LINE_NUMBER: assert last_call[0] == __file__ - if thread_id == nogevent.main_thread_id: + if thread_id == threading.main_thread().ident: count_object += 1 assert stack[1][0] == __file__ assert stack[1][1] == _ALLOC_LINE_NUMBER @@ -163,17 +158,16 @@ def test_memory_collector(): last_call = event.frames[0] assert event.size > 0 if last_call[2] == "" and last_call[1] == _ALLOC_LINE_NUMBER: - assert event.thread_id == nogevent.main_thread_id + assert event.thread_id == threading.main_thread().ident assert event.thread_name == "MainThread" count_object += 1 assert event.frames[2][0] == __file__ - assert event.frames[2][1] == 154 + assert event.frames[2][1] == 149 assert event.frames[2][2] == "test_memory_collector" assert count_object > 0 -@pytest.mark.skipif(TESTING_GEVENT, reason="Test not compatible with gevent") @pytest.mark.parametrize( "ignore_profiler", (True, False), @@ -226,7 +220,7 @@ def test_heap(): for (stack, nframe, thread_id), size in _memalloc.heap(): assert 0 < len(stack) <= max_nframe assert size > 0 - if thread_id == nogevent.main_thread_id: + if thread_id == threading.main_thread().ident: thread_found = True assert isinstance(thread_id, int) if ( diff --git a/tests/profiling/collector/test_stack.py b/tests/profiling/collector/test_stack.py index 80e6e113a49..ac36edceb01 100644 --- a/tests/profiling/collector/test_stack.py +++ b/tests/profiling/collector/test_stack.py @@ -1,5 +1,4 @@ # -*- encoding: utf-8 -*- -import collections import gc import os import sys @@ -7,19 +6,15 @@ import time import timeit from types import FrameType -import typing +import typing # noqa import uuid import pytest import six +from six.moves import _thread -import ddtrace -from ddtrace.internal import compat -from ddtrace.internal import nogevent +import ddtrace # noqa from ddtrace.profiling import _threading -from ddtrace.profiling import collector -from ddtrace.profiling import event as event_mod -from ddtrace.profiling import profiler from ddtrace.profiling import recorder from ddtrace.profiling.collector import stack from ddtrace.profiling.collector import stack_event @@ -47,7 +42,7 @@ def func4(): def func5(): - return nogevent.sleep(1) + return time.sleep(1) def wait_for_event(collector, cond=lambda _: True, retries=10, interval=1): @@ -88,12 +83,8 @@ def test_collect_once(): stack_events = all_events[0] for e in stack_events: if e.thread_name == "MainThread": - if TESTING_GEVENT: - assert e.task_id > 0 - assert e.task_name is not None - else: - assert e.task_id is None - assert e.task_name is None + assert e.task_id is None + assert e.task_name is None assert e.thread_id > 0 assert len(e.frames) >= 1 assert e.frames[0][0].endswith(".py") @@ -135,7 +126,7 @@ def sleep_instance(self): for _ in range(5): if _find_sleep_event(r.events[stack_event.StackSampleEvent], "SomeClass"): return True - nogevent.sleep(1) + time.sleep(1) return False r = recorder.Recorder() @@ -161,7 +152,7 @@ def sleep_instance(foobar, self): for _ in range(5): if _find_sleep_event(r.events[stack_event.StackSampleEvent], ""): return True - nogevent.sleep(1) + time.sleep(1) return False s = stack.StackCollector(r) @@ -180,7 +171,29 @@ def _fib(n): @pytest.mark.skipif(not TESTING_GEVENT, reason="Not testing gevent") +@pytest.mark.subprocess(ddtrace_run=True) def test_collect_gevent_thread_task(): + from gevent import monkey # noqa + + monkey.patch_all() + + import threading + import time + + import pytest + + from ddtrace.profiling import recorder + from ddtrace.profiling.collector import stack + from ddtrace.profiling.collector import stack_event + + def _fib(n): + if n == 1: + return 1 + elif n == 0: + return 0 + else: + return _fib(n - 1) + _fib(n - 2) + r = recorder.Recorder() s = stack.StackCollector(r) @@ -203,10 +216,11 @@ def _dofib(): t.join() for event in r.events[stack_event.StackSampleEvent]: - if event.thread_name == "MainThread" and event.task_id in {thread.ident for thread in threads}: + if event.thread_name is None and event.task_id in {thread.ident for thread in threads}: assert event.task_name.startswith("TestThread ") - # This test is not uber-reliable as it has timing issue, therefore if we find one of our TestThread with the - # correct info, we're happy enough to stop here. + # This test is not uber-reliable as it has timing issue, therefore + # if we find one of our TestThread with the correct info, we're + # happy enough to stop here. break else: pytest.fail("No gevent thread found") @@ -240,37 +254,57 @@ def test_no_ignore_profiler_single(): assert thread_id in {e.thread_id for e in events} -class CollectorTest(collector.PeriodicCollector): - def collect(self): - # type: (...) -> typing.Iterable[typing.Iterable[event_mod.Event]] - _fib(22) - return [] - - @pytest.mark.skipif(not TESTING_GEVENT, reason="Not testing gevent") -@pytest.mark.parametrize("ignore", (True, False)) -def test_ignore_profiler_gevent_task(monkeypatch, ignore): - monkeypatch.setenv("DD_PROFILING_API_TIMEOUT", "0.1") - monkeypatch.setenv("DD_PROFILING_IGNORE_PROFILER", str(ignore)) - p = profiler.Profiler() - p.start() - # This test is particularly useful with gevent enabled: create a test collector that run often and for long so we're - # sure to catch it with the StackProfiler and that it's not ignored. - c = CollectorTest(p._profiler._recorder, interval=0.00001) - c.start() +@pytest.mark.subprocess(ddtrace_run=True) +def test_ignore_profiler_gevent_task(): + import gevent.monkey + + gevent.monkey.patch_all() + + import os + import time + + from ddtrace.profiling import collector # noqa + from ddtrace.profiling import event as event_mod # noqa + from ddtrace.profiling import profiler + from ddtrace.profiling.collector import stack_event + + def _fib(n): + if n == 1: + return 1 + elif n == 0: + return 0 + else: + return _fib(n - 1) + _fib(n - 2) + + class CollectorTest(collector.PeriodicCollector): + def collect(self): + # type: (...) -> typing.Iterable[typing.Iterable[event_mod.Event]] + _fib(22) + return [] + + for ignore in (True, False): + os.environ["DD_PROFILING_API_TIMEOUT"] = "0.1" + os.environ["DD_PROFILING_IGNORE_PROFILER"] = str(ignore) + p = profiler.Profiler() + p.start() + # This test is particularly useful with gevent enabled: create a test collector that run often and for long + # we're sure to catch it with the StackProfiler and that it's not ignored. + c = CollectorTest(p._profiler._recorder, interval=0.00001) + c.start() - for _ in range(100): - events = p._profiler._recorder.reset() - ids = {e.task_id for e in events[stack_event.StackSampleEvent]} - if (c._worker.ident in ids) != ignore: - break - # Give some time for gevent to switch greenlets - time.sleep(0.1) - else: - assert False + for _ in range(100): + events = p._profiler._recorder.reset() + ids = {e.task_id for e in events[stack_event.StackSampleEvent]} + if (c._worker.ident in ids) != str(ignore): + break + # Give some time for gevent to switch greenlets + time.sleep(0.1) + else: + assert False, "ignore == " + ignore - c.stop() - p.stop(flush=False) + c.stop() + p.stop(flush=False) def test_collect(): @@ -375,7 +409,6 @@ def test_stress_threads_run_as_thread(): @pytest.mark.skipif(not stack.FEATURES["stack-exceptions"], reason="Stack exceptions not supported") -@pytest.mark.skipif(TESTING_GEVENT, reason="Test not compatible with gevent") def test_exception_collection_threads(): NB_THREADS = 5 @@ -409,16 +442,18 @@ def test_exception_collection(): try: raise ValueError("hello") except Exception: - nogevent.sleep(1) + time.sleep(1) exception_events = r.events[stack_event.StackExceptionSampleEvent] assert len(exception_events) >= 1 e = exception_events[0] assert e.timestamp > 0 assert e.sampling_period > 0 - assert e.thread_id == nogevent.thread_get_ident() + assert e.thread_id == _thread.get_ident() assert e.thread_name == "MainThread" - assert e.frames == [(__file__, 412, "test_exception_collection", "")] + assert e.frames == [ + (__file__, test_exception_collection.__code__.co_firstlineno + 8, "test_exception_collection", "") + ] assert e.nframes == 1 assert e.exc_type == ValueError @@ -436,7 +471,7 @@ def test_exception_collection_trace( try: raise ValueError("hello") except Exception: - nogevent.sleep(1) + time.sleep(1) # Check we caught an event or retry exception_events = r.reset()[stack_event.StackExceptionSampleEvent] @@ -448,9 +483,11 @@ def test_exception_collection_trace( e = exception_events[0] assert e.timestamp > 0 assert e.sampling_period > 0 - assert e.thread_id == nogevent.thread_get_ident() + assert e.thread_id == _thread.get_ident() assert e.thread_name == "MainThread" - assert e.frames == [(__file__, 439, "test_exception_collection_trace", "")] + assert e.frames == [ + (__file__, test_exception_collection_trace.__code__.co_firstlineno + 13, "test_exception_collection_trace", "") + ] assert e.nframes == 1 assert e.exc_type == ValueError assert e.span_id == span.span_id @@ -466,12 +503,13 @@ def tracer_and_collector(tracer): yield tracer, c finally: c.stop() + tracer.shutdown() def test_thread_to_span_thread_isolation(tracer_and_collector): t, c = tracer_and_collector root = t.start_span("root", activate=True) - thread_id = nogevent.thread_get_ident() + thread_id = _thread.get_ident() assert c._thread_span_links.get_active_span_from_thread_id(thread_id) == root quit_thread = threading.Event() @@ -487,13 +525,8 @@ def start_span(): th = threading.Thread(target=start_span) th.start() span_started.wait() - if TESTING_GEVENT: - # We track *real* threads, gevent is using only one in this case - assert c._thread_span_links.get_active_span_from_thread_id(thread_id) == store["span2"] - assert c._thread_span_links.get_active_span_from_thread_id(th.ident) is None - else: - assert c._thread_span_links.get_active_span_from_thread_id(thread_id) == root - assert c._thread_span_links.get_active_span_from_thread_id(th.ident) == store["span2"] + assert c._thread_span_links.get_active_span_from_thread_id(thread_id) == root + assert c._thread_span_links.get_active_span_from_thread_id(th.ident) == store["span2"] # Do not quit the thread before we test, otherwise the collector might clean up the thread from the list of spans quit_thread.set() th.join() @@ -502,7 +535,7 @@ def start_span(): def test_thread_to_span_multiple(tracer_and_collector): t, c = tracer_and_collector root = t.start_span("root", activate=True) - thread_id = nogevent.thread_get_ident() + thread_id = _thread.get_ident() assert c._thread_span_links.get_active_span_from_thread_id(thread_id) == root subspan = t.start_span("subtrace", child_of=root, activate=True) assert c._thread_span_links.get_active_span_from_thread_id(thread_id) == subspan @@ -521,7 +554,7 @@ def test_thread_to_child_span_multiple_unknown_thread(tracer_and_collector): def test_thread_to_child_span_clear(tracer_and_collector): t, c = tracer_and_collector root = t.start_span("root", activate=True) - thread_id = nogevent.thread_get_ident() + thread_id = _thread.get_ident() assert c._thread_span_links.get_active_span_from_thread_id(thread_id) == root c._thread_span_links.clear_threads(set()) assert c._thread_span_links.get_active_span_from_thread_id(thread_id) is None @@ -530,7 +563,7 @@ def test_thread_to_child_span_clear(tracer_and_collector): def test_thread_to_child_span_multiple_more_children(tracer_and_collector): t, c = tracer_and_collector root = t.start_span("root", activate=True) - thread_id = nogevent.thread_get_ident() + thread_id = _thread.get_ident() assert c._thread_span_links.get_active_span_from_thread_id(thread_id) == root subspan = t.start_span("subtrace", child_of=root, activate=True) subsubspan = t.start_span("subsubtrace", child_of=subspan, activate=True) @@ -617,14 +650,13 @@ def _trace(): t.join() -@pytest.mark.skipif(TESTING_GEVENT, reason="Test not compatible with gevent") def test_thread_time_cache(): tt = stack._ThreadTime() - lock = nogevent.Lock() + lock = threading.Lock() lock.acquire() - t = nogevent.Thread(target=lock.acquire) + t = threading.Thread(target=lock.acquire) t.start() main_thread_id = threading.current_thread().ident @@ -666,10 +698,23 @@ def test_thread_time_cache(): @pytest.mark.skipif(not TESTING_GEVENT, reason="Not testing gevent") +@pytest.mark.subprocess(ddtrace_run=True) def test_collect_gevent_threads(): + import gevent.monkey + + gevent.monkey.patch_all() + + import collections + import threading + import time + + from ddtrace.profiling import recorder + from ddtrace.profiling.collector import stack + from ddtrace.profiling.collector import stack_event + # type: (...) -> None r = recorder.Recorder() - s = stack.StackCollector(r, ignore_profiler=True, max_time_usage_pct=100) + s = stack.StackCollector(r, max_time_usage_pct=100) iteration = 100 sleep_time = 0.01 @@ -696,11 +741,8 @@ def _nothing(): events = r.events[stack_event.StackSampleEvent] for event in events: - if event.task_id == compat.main_thread.ident: - if event.task_name is None: - pytest.fail("Task with no name detected, is it the Hub?") - else: - main_thread_found = True + if event.task_name == "MainThread": + main_thread_found = True elif event.task_id in {t.ident for t in threads}: for filename, lineno, funcname, classname in event.frames: if funcname in ( @@ -719,16 +761,18 @@ def _nothing(): # sanity check: we don't have duplicate in thread/task ids. assert len(wall_time_ns_per_thread) == nb_threads - # In theory there should be only one value in this set, but due to timing, it's possible one task has less event, so - # we're not checking the len() of values here. + # In theory there should be only one value in this set, but due to timing, + # it's possible one task has less event, so we're not checking the len() of + # values here. values = set(wall_time_ns_per_thread.values()) - # NOTE(jd): I'm disabling this check because it works 90% of the test only. There are some cases where this test is - # run inside the complete test suite and fails, while it works 100% of the time in its own. - # Check that the sum of wall time generated for each task is right. - # Accept a 30% margin though, don't be crazy, we're just doing 5 seconds with a lot of tasks. - # exact_time = iteration * sleep_time * 1e9 - # assert (exact_time * 0.7) <= values.pop() <= (exact_time * 1.3) + # NOTE(jd): I'm disabling this check because it works 90% of the test only. + # There are some cases where this test is run inside the complete test suite + # and fails, while it works 100% of the time in its own. Check that the sum + # of wall time generated for each task is right. Accept a 30% margin though, + # don't be crazy, we're just doing 5 seconds with a lot of tasks. exact_time + # = iteration * sleep_time * 1e9 assert (exact_time * 0.7) <= values.pop() + # <= (exact_time * 1.3) assert values.pop() > 0 diff --git a/tests/profiling/collector/test_stack_asyncio.py b/tests/profiling/collector/test_stack_asyncio.py index 25651825081..48bab01f1c4 100644 --- a/tests/profiling/collector/test_stack_asyncio.py +++ b/tests/profiling/collector/test_stack_asyncio.py @@ -1,6 +1,5 @@ import asyncio import collections -import os import sys import pytest @@ -12,9 +11,6 @@ from . import _asyncio_compat -TESTING_GEVENT = os.getenv("DD_PROFILE_TEST_GEVENT", False) - - @pytest.mark.skipif(not _asyncio_compat.PY36_AND_LATER, reason="Python > 3.5 needed") def test_asyncio(tmp_path, monkeypatch) -> None: sleep_time = 0.2 @@ -61,22 +57,18 @@ async def hello() -> None: if _asyncio_compat.PY37_AND_LATER: if event.task_name == "main": assert event.thread_name == "MainThread" - assert event.frames == [(__file__, 30, "hello", "")] + assert event.frames == [(__file__, test_asyncio.__code__.co_firstlineno + 12, "hello", "")] assert event.nframes == 1 elif event.task_name == t1_name: assert event.thread_name == "MainThread" - assert event.frames == [(__file__, 24, "stuff", "")] + assert event.frames == [(__file__, test_asyncio.__code__.co_firstlineno + 6, "stuff", "")] assert event.nframes == 1 elif event.task_name == t2_name: assert event.thread_name == "MainThread" - assert event.frames == [(__file__, 24, "stuff", "")] + assert event.frames == [(__file__, test_asyncio.__code__.co_firstlineno + 6, "stuff", "")] assert event.nframes == 1 - if event.thread_name == "MainThread" and ( - # The task name is empty in asyncio (it's not a task) but the main thread is seen as a task in gevent - (event.task_name is None and not TESTING_GEVENT) - or (event.task_name == "MainThread" and TESTING_GEVENT) - ): + if event.thread_name == "MainThread" and event.task_name is None: # Make sure we account CPU time if event.cpu_time_ns > 0: cpu_time_found = True diff --git a/tests/profiling/collector/test_task.py b/tests/profiling/collector/test_task.py index 48078d3b667..3faa541fce3 100644 --- a/tests/profiling/collector/test_task.py +++ b/tests/profiling/collector/test_task.py @@ -1,10 +1,8 @@ import os -import threading import pytest from ddtrace.internal import compat -from ddtrace.internal import nogevent from ddtrace.profiling.collector import _task @@ -14,18 +12,25 @@ def test_get_task_main(): # type: (...) -> None if _task._gevent_tracer is None: - assert _task.get_task(nogevent.main_thread_id) == (None, None, None) - else: - assert _task.get_task(nogevent.main_thread_id) == (compat.main_thread.ident, "MainThread", None) + assert _task.get_task(compat.main_thread.ident) == (None, None, None) -@pytest.mark.skipif(TESTING_GEVENT, reason="only works without gevent") def test_list_tasks_nogevent(): - assert _task.list_tasks(nogevent.main_thread_id) == [] + assert _task.list_tasks(compat.main_thread.ident) == [] @pytest.mark.skipif(not TESTING_GEVENT, reason="only works with gevent") +@pytest.mark.subprocess def test_list_tasks_gevent(): + import gevent.monkey + + gevent.monkey.patch_all() + + import threading + + from ddtrace.internal import compat + from ddtrace.profiling.collector import _task + l1 = threading.Lock() l1.acquire() @@ -39,7 +44,7 @@ def nothing(): t1 = threading.Thread(target=wait, name="t1") t1.start() - tasks = _task.list_tasks(nogevent.main_thread_id) + tasks = _task.list_tasks(compat.main_thread.ident) # can't check == 2 because there are left over from other tests assert len(tasks) >= 2 diff --git a/tests/profiling/collector/test_threading.py b/tests/profiling/collector/test_threading.py index 4cc3bfe14aa..c0d6b7e93b2 100644 --- a/tests/profiling/collector/test_threading.py +++ b/tests/profiling/collector/test_threading.py @@ -3,8 +3,8 @@ import uuid import pytest +from six.moves import _thread -from ddtrace.internal import nogevent from ddtrace.profiling import recorder from ddtrace.profiling.collector import threading as collector_threading @@ -67,7 +67,7 @@ def test_lock_acquire_events(): assert len(r.events[collector_threading.ThreadingLockReleaseEvent]) == 0 event = r.events[collector_threading.ThreadingLockAcquireEvent][0] assert event.lock_name == "test_threading.py:64" - assert event.thread_id == nogevent.thread_get_ident() + assert event.thread_id == _thread.get_ident() assert event.wait_time_ns >= 0 # It's called through pytest so I'm sure it's gonna be that long, right? assert len(event.frames) > 3 @@ -91,7 +91,7 @@ def lockfunc(self): assert len(r.events[collector_threading.ThreadingLockReleaseEvent]) == 0 event = r.events[collector_threading.ThreadingLockAcquireEvent][0] assert event.lock_name == "test_threading.py:85" - assert event.thread_id == nogevent.thread_get_ident() + assert event.thread_id == _thread.get_ident() assert event.wait_time_ns >= 0 # It's called through pytest so I'm sure it's gonna be that long, right? assert len(event.frames) > 3 @@ -206,7 +206,7 @@ def test_lock_release_events(): assert len(r.events[collector_threading.ThreadingLockReleaseEvent]) == 1 event = r.events[collector_threading.ThreadingLockReleaseEvent][0] assert event.lock_name == "test_threading.py:202" - assert event.thread_id == nogevent.thread_get_ident() + assert event.thread_id == _thread.get_ident() assert event.locked_for_ns >= 0 # It's called through pytest so I'm sure it's gonna be that long, right? assert len(event.frames) > 3 @@ -215,8 +215,20 @@ def test_lock_release_events(): assert event.sampling_pct == 100 -@pytest.mark.skipif(not TESTING_GEVENT, reason="Not testing gevent") +@pytest.mark.skipif(not TESTING_GEVENT, reason="only works with gevent") +@pytest.mark.subprocess def test_lock_gevent_tasks(): + from gevent import monkey # noqa + + monkey.patch_all() + + import threading + + import pytest + + from ddtrace.profiling import recorder + from ddtrace.profiling.collector import threading as collector_threading + r = recorder.Recorder() def play_with_lock(): @@ -233,35 +245,29 @@ def play_with_lock(): assert len(r.events[collector_threading.ThreadingLockReleaseEvent]) >= 1 for event in r.events[collector_threading.ThreadingLockAcquireEvent]: - if event.lock_name == "test_threading.py:223": - assert event.thread_id == nogevent.main_thread_id + if event.lock_name == "test_threading.py:235": assert event.wait_time_ns >= 0 assert event.task_id == t.ident assert event.task_name == "foobar" # It's called through pytest so I'm sure it's gonna be that long, right? assert len(event.frames) > 3 assert event.nframes > 3 - assert event.frames[0] == (__file__.replace(".pyc", ".py"), 224, "play_with_lock", "") + assert event.frames[0] == ("tests/profiling/collector/test_threading.py", 236, "play_with_lock", "") assert event.sampling_pct == 100 - assert event.task_id == t.ident - assert event.task_name == "foobar" break else: pytest.fail("Lock event not found") for event in r.events[collector_threading.ThreadingLockReleaseEvent]: - if event.lock_name == "test_threading.py:223": - assert event.thread_id == nogevent.main_thread_id + if event.lock_name == "test_threading.py:235": assert event.locked_for_ns >= 0 assert event.task_id == t.ident assert event.task_name == "foobar" # It's called through pytest so I'm sure it's gonna be that long, right? assert len(event.frames) > 3 assert event.nframes > 3 - assert event.frames[0] == (__file__.replace(".pyc", ".py"), 225, "play_with_lock", "") + assert event.frames[0] == ("tests/profiling/collector/test_threading.py", 237, "play_with_lock", "") assert event.sampling_pct == 100 - assert event.task_id == t.ident - assert event.task_name == "foobar" break else: pytest.fail("Lock event not found") diff --git a/tests/profiling/collector/test_threading_asyncio.py b/tests/profiling/collector/test_threading_asyncio.py index 94249fef998..b763a51508b 100644 --- a/tests/profiling/collector/test_threading_asyncio.py +++ b/tests/profiling/collector/test_threading_asyncio.py @@ -1,4 +1,3 @@ -import os import threading import pytest @@ -9,9 +8,6 @@ from . import _asyncio_compat -TESTING_GEVENT = os.getenv("DD_PROFILE_TEST_GEVENT", False) - - @pytest.mark.skipif(not _asyncio_compat.PY36_AND_LATER, reason="Python > 3.5 needed") def test_lock_acquire_events(tmp_path, monkeypatch): async def _lock(): @@ -36,16 +32,12 @@ def asyncio_run(): lock_found = 0 for event in events[collector_threading.ThreadingLockAcquireEvent]: - if event.lock_name == "test_threading_asyncio.py:18": + if event.lock_name == "test_threading_asyncio.py:%d" % (test_lock_acquire_events.__code__.co_firstlineno + 3): assert event.task_name.startswith("Task-") lock_found += 1 - elif event.lock_name == "test_threading_asyncio.py:22": - if TESTING_GEVENT: - assert event.task_name == "foobar" - assert event.thread_name == "MainThread" - else: - assert event.task_name is None - assert event.thread_name == "foobar" + elif event.lock_name == "test_threading_asyncio.py:%d" % (test_lock_acquire_events.__code__.co_firstlineno + 7): + assert event.task_name is None + assert event.thread_name == "foobar" lock_found += 1 if lock_found != 2: diff --git a/tests/profiling/run.py b/tests/profiling/run.py index d9848caa55a..b7584fb735d 100644 --- a/tests/profiling/run.py +++ b/tests/profiling/run.py @@ -1,15 +1,7 @@ -import os import runpy import sys -if "DD_PROFILE_TEST_GEVENT" in os.environ: - from gevent import monkey - - monkey.patch_all() - print("=> gevent monkey patching done") - -# TODO Use gevent.monkey once https://github.com/gevent/gevent/pull/1440 is merged? module = sys.argv[1] del sys.argv[0] runpy.run_module(module, run_name="__main__") diff --git a/tests/profiling/simple_program_gevent.py b/tests/profiling/simple_program_gevent.py index d2d516be6ad..d3606cac74b 100644 --- a/tests/profiling/simple_program_gevent.py +++ b/tests/profiling/simple_program_gevent.py @@ -7,7 +7,6 @@ import time from ddtrace.profiling import bootstrap -# do not use ddtrace-run; the monkey-patching would be done too late import ddtrace.profiling.auto from ddtrace.profiling.collector import stack_event diff --git a/tests/profiling/test_accuracy.py b/tests/profiling/test_accuracy.py index ab9bb0ce0c2..0fc19f0185f 100644 --- a/tests/profiling/test_accuracy.py +++ b/tests/profiling/test_accuracy.py @@ -3,8 +3,6 @@ import os import time -import pytest - from ddtrace.internal import compat from ddtrace.profiling import profiler from ddtrace.profiling.collector import stack_event @@ -68,8 +66,6 @@ def total_time(time_data, funcname): return sum(functime[funcname] for functime in time_data.values()) -# This test does not work with gevent since sleeping is interrupted by gevent monkey patched version. -@pytest.mark.skipif(TESTING_GEVENT, reason="Test not compatible with gevent") def test_accuracy(monkeypatch): # Set this to 100 so we don't sleep too often and mess with the precision. monkeypatch.setenv("DD_PROFILING_MAX_TIME_USAGE_PCT", "100") diff --git a/tests/profiling/test_gunicorn.py b/tests/profiling/test_gunicorn.py index 4171e9128e1..e3fb84d7f2e 100644 --- a/tests/profiling/test_gunicorn.py +++ b/tests/profiling/test_gunicorn.py @@ -70,14 +70,3 @@ def _test_gunicorn(gunicorn, tmp_path, monkeypatch, *args): def test_gunicorn(gunicorn, tmp_path, monkeypatch): # type: (...) -> None _test_gunicorn(gunicorn, tmp_path, monkeypatch) - - -# This test does not work when run run via pytest: -# [CRITICAL] WORKER TIMEOUT (pid:33923) -# [WARNING] Worker with pid 33923 was terminated due to signal 6 -# There's something odd going on with gevent which prevents this from working. -@pytest.mark.skipif(not TESTING_GEVENT, reason="Not testing gevent") -def test_gunicorn_gevent(gunicorn, tmp_path, monkeypatch): - # type: (...) -> None - monkeypatch.setenv("DD_GEVENT_PATCH_ALL", "1") - _test_gunicorn(gunicorn, tmp_path, monkeypatch, "--worker-class", "gevent") diff --git a/tests/profiling/test_nogevent.py b/tests/profiling/test_nogevent.py deleted file mode 100644 index c8db62f1dd6..00000000000 --- a/tests/profiling/test_nogevent.py +++ /dev/null @@ -1,16 +0,0 @@ -import os - -import pytest -import six - -from ddtrace.internal import nogevent - - -TESTING_GEVENT = os.getenv("DD_PROFILE_TEST_GEVENT", False) - - -@pytest.mark.skipif(not TESTING_GEVENT or six.PY3, reason="Not testing gevent or testing on Python 3") -def test_nogevent_rlock(): - import gevent - - assert not isinstance(nogevent.RLock()._RLock__block, gevent.thread.LockType) diff --git a/tests/tracer/test_periodic.py b/tests/tracer/test_periodic.py index 4b8bc2ecd50..5b14f5e5c1e 100644 --- a/tests/tracer/test_periodic.py +++ b/tests/tracer/test_periodic.py @@ -1,5 +1,5 @@ -import os import threading +from threading import Event from time import sleep import pytest @@ -8,35 +8,6 @@ from ddtrace.internal import service -if os.getenv("DD_PROFILE_TEST_GEVENT", False): - import gevent - - class Event(object): - """ - We can't use gevent Events here[0], nor can we use native threading - events (because gevent is not multi-threaded). - - So for gevent, since it's not multi-threaded and will not run greenlets - in parallel (for our usage here, anyway) we can write a dummy Event - class which just does a simple busy wait on a shared variable. - - [0] https://github.com/gevent/gevent/issues/891 - """ - - state = False - - def wait(self): - while not self.state: - gevent.sleep(0.001) - - def set(self): - self.state = True - - -else: - Event = threading.Event - - def test_periodic(): x = {"OK": False} @@ -51,7 +22,7 @@ def _run_periodic(): def _on_shutdown(): x["DOWN"] = True - t = periodic.PeriodicRealThreadClass()(0.001, _run_periodic, on_shutdown=_on_shutdown) + t = periodic.PeriodicThread(0.001, _run_periodic, on_shutdown=_on_shutdown) t.start() thread_started.wait() thread_continue.set() @@ -69,7 +40,7 @@ def test_periodic_double_start(): def _run_periodic(): pass - t = periodic.PeriodicRealThreadClass()(0.1, _run_periodic) + t = periodic.PeriodicThread(0.1, _run_periodic) t.start() with pytest.raises(RuntimeError): t.start() @@ -89,7 +60,7 @@ def _run_periodic(): def _on_shutdown(): x["DOWN"] = True - t = periodic.PeriodicRealThreadClass()(0.001, _run_periodic, on_shutdown=_on_shutdown) + t = periodic.PeriodicThread(0.001, _run_periodic, on_shutdown=_on_shutdown) t.start() thread_started.wait() thread_continue.set() @@ -98,13 +69,6 @@ def _on_shutdown(): assert "DOWN" not in x -def test_gevent_class(): - if os.getenv("DD_PROFILE_TEST_GEVENT", False): - assert isinstance(periodic.PeriodicRealThreadClass()(1, sum), periodic._GeventPeriodicThread) - else: - assert isinstance(periodic.PeriodicRealThreadClass()(1, sum), periodic.PeriodicThread) - - def test_periodic_service_start_stop(): t = periodic.PeriodicService(1) t.start() @@ -138,7 +102,7 @@ def test_is_alive_before_start(): def x(): pass - t = periodic.PeriodicRealThreadClass()(1, x) + t = periodic.PeriodicThread(1, x) assert not t.is_alive()