Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rcm): fix stuck child processes when a gevent application creates forks #5088

Closed
wants to merge 61 commits into from
Closed
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
358c6f2
fix(rcm): stuck childs when app use os.fork
avara1986 Feb 9, 2023
391a0ec
fix(rcm): validate env var in internal writer
avara1986 Feb 9, 2023
15456e2
fix(rcm): validate if exists products to report to the agent
avara1986 Feb 9, 2023
cd68983
chore(asm): improve changelog
juanjux Feb 9, 2023
7a2f748
chore(asm): update gevent docs
juanjux Feb 9, 2023
8d65850
chore(rcm): add regression test
juanjux Feb 9, 2023
0bb4084
fix(rcm): fix tests
avara1986 Feb 9, 2023
23d8dc0
fix(rcm): fix tests
avara1986 Feb 9, 2023
e644f3d
chore(rcm): refactor, rename remote config class
avara1986 Feb 9, 2023
771b778
chore: require requests for tests
juanjux Feb 9, 2023
fe8e1e3
fix(rcm): fix tests
avara1986 Feb 9, 2023
bfcaa2b
Merge branch '1.x' into avara1986/APPSEC-8131-rcm-forks-error
avara1986 Feb 9, 2023
f8cd656
fix(rcm): fix tests
avara1986 Feb 9, 2023
f604d90
Merge remote-tracking branch 'origin/avara1986/APPSEC-8131-rcm-forks-…
avara1986 Feb 9, 2023
fb4ae3e
fix(rcm): fix tests
avara1986 Feb 9, 2023
3357556
fix(rcm): comment debug test
avara1986 Feb 9, 2023
e4faf2b
fix(rcm): fix tests
avara1986 Feb 9, 2023
432e0d9
fix(rcm): fix tests
avara1986 Feb 9, 2023
ea1c0ef
fix(rcm): fix tests
avara1986 Feb 9, 2023
189018c
fix(rcm): fix tests
avara1986 Feb 9, 2023
3db6e0c
Merge branch '1.x' into avara1986/APPSEC-8131-rcm-forks-error
juanjux Feb 10, 2023
c2e7e32
fix(rcm): fix gunicorn tests in py3.11
avara1986 Feb 10, 2023
8277093
fix(rcm): fix debugger tests
avara1986 Feb 10, 2023
f5ea437
fix(rcm): fix stuck tests
avara1986 Feb 10, 2023
2ddd051
fix(rcm): fix tests
avara1986 Feb 10, 2023
c86af1f
fix(rcm): fix tests
avara1986 Feb 10, 2023
fb4aafc
fix(rcm): fix tests
avara1986 Feb 10, 2023
17e7348
fix(rcm): fix tests
avara1986 Feb 10, 2023
b957708
fix(rcm): fix gunicorn tests
avara1986 Feb 10, 2023
d936306
fix(rcm): fix gunicorn tests
avara1986 Feb 10, 2023
364bd68
fix(rcm): fix debugger
avara1986 Feb 10, 2023
4b2c781
Merge branch '1.x' into avara1986/APPSEC-8131-rcm-forks-error
avara1986 Feb 10, 2023
7e236b7
fix(rcm): fix tests
avara1986 Feb 10, 2023
414a43b
fix(rcm): fix tests
avara1986 Feb 10, 2023
99e25c3
fix(rcm): fix tests
avara1986 Feb 10, 2023
ab6b415
fix(rcm): fix tests
avara1986 Feb 10, 2023
8015462
Merge branch '1.x' into avara1986/APPSEC-8131-rcm-forks-error
avara1986 Feb 13, 2023
60745ea
fix(rcm): fix tests
avara1986 Feb 13, 2023
40e1a37
Merge branch '1.x' into avara1986/APPSEC-8131-rcm-forks-error
avara1986 Feb 13, 2023
fc667a0
fix(rcm): fix tests
avara1986 Feb 13, 2023
575a6b1
fix(rcm): fix tests
avara1986 Feb 13, 2023
cd47779
fix(rcm): fix tests
avara1986 Feb 13, 2023
ee49c32
Merge branch '1.x' into avara1986/APPSEC-8131-rcm-forks-error
avara1986 Feb 13, 2023
7f6f54d
Merge branch '1.x' into avara1986/APPSEC-8131-rcm-forks-error
avara1986 Feb 14, 2023
c650f29
Merge branch '1.x' into avara1986/APPSEC-8131-rcm-forks-error
avara1986 Feb 14, 2023
04b981d
Merge branch '1.x' into avara1986/APPSEC-8131-rcm-forks-error
avara1986 Feb 14, 2023
703bddb
Merge branch '1.x' into avara1986/APPSEC-8131-rcm-forks-error
avara1986 Feb 14, 2023
60f9b7b
Merge branch '1.x' into avara1986/APPSEC-8131-rcm-forks-error
juanjux Feb 15, 2023
c4a2779
Merge branch '1.x' into avara1986/APPSEC-8131-rcm-forks-error
avara1986 Feb 16, 2023
8a32ba5
Merge branch '1.x' into avara1986/APPSEC-8131-rcm-forks-error
avara1986 Feb 16, 2023
39ba43f
Merge branch '1.x' into avara1986/APPSEC-8131-rcm-forks-error
avara1986 Feb 16, 2023
455a667
Merge branch '1.x' into avara1986/APPSEC-8131-rcm-forks-error
avara1986 Feb 16, 2023
8e67129
Merge branch '1.x' into avara1986/APPSEC-8131-rcm-forks-error
avara1986 Feb 17, 2023
70ff85c
Merge branch '1.x' into avara1986/APPSEC-8131-rcm-forks-error
emmettbutler Feb 17, 2023
f73fc22
Merge branch '1.x' into avara1986/APPSEC-8131-rcm-forks-error
avara1986 Feb 28, 2023
93cee48
chore(rcm): fix tests
avara1986 Feb 28, 2023
8667beb
Merge branch '1.x' into avara1986/APPSEC-8131-rcm-forks-error
avara1986 Feb 28, 2023
2af6170
Merge branch '1.x' into avara1986/APPSEC-8131-rcm-forks-error
avara1986 Feb 28, 2023
6f8dcb9
Merge branch '1.x' into avara1986/APPSEC-8131-rcm-forks-error
avara1986 Feb 28, 2023
4315091
Merge branch '1.x' into avara1986/APPSEC-8131-rcm-forks-error
avara1986 Mar 1, 2023
f72ae72
Merge branch '1.x' into avara1986/APPSEC-8131-rcm-forks-error
avara1986 Mar 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions ddtrace/appsec/_remoteconfiguration.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,18 @@ def enable_appsec_rc():
from ddtrace import tracer

if _appsec_rc_features_is_enabled():
from ddtrace.internal.remoteconfig import RemoteConfig
from ddtrace.internal.remoteconfig import remoteconfig_poller

RemoteConfig.register(PRODUCTS.ASM_FEATURES, appsec_rc_reload_features(tracer))
remoteconfig_poller.register(PRODUCTS.ASM_FEATURES, appsec_rc_reload_features(tracer))

if tracer._appsec_enabled:
from ddtrace.internal.remoteconfig import RemoteConfig
from ddtrace.internal.remoteconfig import remoteconfig_poller

RemoteConfig.register(PRODUCTS.ASM_DATA, appsec_rc_reload_features(tracer)) # IP Blocking
RemoteConfig.register(PRODUCTS.ASM, appsec_rc_reload_features(tracer)) # Exclusion Filters & Custom Rules
RemoteConfig.register(PRODUCTS.ASM_DD, appsec_rc_reload_features(tracer)) # DD Rules
remoteconfig_poller.register(PRODUCTS.ASM_DATA, appsec_rc_reload_features(tracer)) # IP Blocking
remoteconfig_poller.register(
PRODUCTS.ASM, appsec_rc_reload_features(tracer)
) # Exclusion Filters & Custom Rules
remoteconfig_poller.register(PRODUCTS.ASM_DD, appsec_rc_reload_features(tracer)) # DD Rules


def _add_rules_to_list(features, feature, message, rule_list):
Expand Down Expand Up @@ -87,23 +89,25 @@ def _appsec_1click_activation(tracer, features):

if rc_appsec_enabled is not None:
from ddtrace.appsec._constants import PRODUCTS
from ddtrace.internal.remoteconfig import RemoteConfig
from ddtrace.internal.remoteconfig import remoteconfig_poller

log.debug("Reloading Appsec 1-click: %s", rc_appsec_enabled)

if rc_appsec_enabled:
RemoteConfig.register(PRODUCTS.ASM_DATA, appsec_rc_reload_features(tracer)) # IP Blocking
RemoteConfig.register(PRODUCTS.ASM, appsec_rc_reload_features(tracer)) # Exclusion Filters & Custom Rules
RemoteConfig.register(PRODUCTS.ASM_DD, appsec_rc_reload_features(tracer)) # DD Rules
remoteconfig_poller.register(PRODUCTS.ASM_DATA, appsec_rc_reload_features(tracer)) # IP Blocking
remoteconfig_poller.register(
PRODUCTS.ASM, appsec_rc_reload_features(tracer)
) # Exclusion Filters & Custom Rules
remoteconfig_poller.register(PRODUCTS.ASM_DD, appsec_rc_reload_features(tracer)) # DD Rules
if not tracer._appsec_enabled:
tracer.configure(appsec_enabled=True)
else:
config._appsec_enabled = True

else:
RemoteConfig.unregister(PRODUCTS.ASM_DATA)
RemoteConfig.unregister(PRODUCTS.ASM)
RemoteConfig.unregister(PRODUCTS.ASM_DD)
remoteconfig_poller.unregister(PRODUCTS.ASM_DATA)
remoteconfig_poller.unregister(PRODUCTS.ASM)
remoteconfig_poller.unregister(PRODUCTS.ASM_DD)
if tracer._appsec_enabled:
tracer.configure(appsec_enabled=False)
else:
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/contrib/gevent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
.. note::
If :ref:`ddtrace-run<ddtracerun>` is being used set ``DD_GEVENT_PATCH_ALL=true`` and
``gevent.monkey.patch_all()`` will be called as early as possible in the application
to avoid patching conflicts.
to avoid patching conflicts which could case several problems including stuck processes.
If ``ddtrace-run`` is not being used then be sure to call ``gevent.monkey.patch_all``
before importing ``ddtrace`` and calling ``ddtrace.patch`` or ``ddtrace.patch_all``.

Expand Down
9 changes: 6 additions & 3 deletions ddtrace/debugging/_debugger.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from collections import defaultdict
from itertools import chain
import os
import sys
import threading
from types import FunctionType
Expand Down Expand Up @@ -54,9 +55,10 @@
from ddtrace.internal.module import unregister_post_run_module_hook
from ddtrace.internal.rate_limiter import BudgetRateLimiterWithJitter as RateLimiter
from ddtrace.internal.rate_limiter import RateLimitExceeded
from ddtrace.internal.remoteconfig import RemoteConfig
from ddtrace.internal.remoteconfig import remoteconfig_poller
from ddtrace.internal.safety import _isinstance
from ddtrace.internal.service import Service
from ddtrace.internal.utils.formats import asbool
from ddtrace.internal.wrapping import Wrapper


Expand Down Expand Up @@ -246,8 +248,9 @@ def __init__(self, tracer=None):
)

# Register the debugger with the RCM client.
RemoteConfig.register("LIVE_DEBUGGING", self.__rc_adapter__(self._on_configuration))

if asbool(os.getenv("DD_REMOTE_CONFIGURATION_ENABLED", True)):
remoteconfig_poller.enable()
remoteconfig_poller.register("LIVE_DEBUGGING", self.__rc_adapter__(self._on_configuration))
log.debug("%s initialized (service name: %s)", self.__class__.__name__, service_name)

def _on_encoder_buffer_full(self, item, encoded):
Expand Down
3 changes: 3 additions & 0 deletions ddtrace/internal/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ def _healthcheck():
_conn.request("GET", "info", {}, {"content-type": "application/json"})
resp = _conn.getresponse()
data = resp.read()
except OSError:
log.warning("Unexpected connection error with Agent", exc_info=True)
return False
finally:
_conn.close()

Expand Down
129 changes: 87 additions & 42 deletions ddtrace/internal/remoteconfig/__init__.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,60 @@
import logging
import os
from typing import Optional

from ddtrace.internal import agent
from ddtrace.internal import atexit
from ddtrace.internal import forksafe
from ddtrace.internal import periodic
from ddtrace.internal.logger import get_logger
from ddtrace.internal.remoteconfig.client import RemoteConfigClient
from ddtrace.internal.remoteconfig.constants import REMOTE_CONFIG_AGENT_ENDPOINT
from ddtrace.internal.remoteconfig.worker import RemoteConfigWorker
from ddtrace.internal.service import ServiceStatus
from ddtrace.internal.utils.time import StopWatch
from ddtrace.vendor.debtcollector import deprecate


log = get_logger(__name__)

DEFAULT_REMOTECONFIG_POLL_SECONDS = 5.0 # seconds

class RemoteConfig(object):
_worker = None
_worker_lock = forksafe.Lock()

@classmethod
def _check_remote_config_enable_in_agent(cls):
def get_poll_interval_seconds():
# type:() -> float
if os.getenv("DD_REMOTECONFIG_POLL_SECONDS"):
deprecate(
"Using environment variable 'DD_REMOTECONFIG_POLL_SECONDS' is deprecated",
message="Please use DD_REMOTE_CONFIG_POLL_INTERVAL_SECONDS instead.",
removal_version="2.0.0",
)
return float(
os.getenv(
"DD_REMOTE_CONFIG_POLL_INTERVAL_SECONDS",
default=os.getenv("DD_REMOTECONFIG_POLL_SECONDS", default=DEFAULT_REMOTECONFIG_POLL_SECONDS),
)
)


class RemoteConfigPoller(periodic.PeriodicService):
def __init__(self):
super(RemoteConfigPoller, self).__init__(interval=get_poll_interval_seconds())
self._client = RemoteConfigClient()
log.debug("RemoteConfigWorker created with polling interval %d", get_poll_interval_seconds())

def periodic(self):
# type: () -> None
with StopWatch() as sw:
if self._client.has_products():
self._client.request()

t = sw.elapsed()
if t >= self.interval:
log_level = logging.WARNING
else:
log_level = logging.DEBUG
log.log(log_level, "request config in %.5fs to %s", t, self._client.agent_url)

@staticmethod
def _check_remote_config_enable_in_agent():
# type: () -> Optional[bool]
data = agent._healthcheck()
if data and data.get("endpoints"):
Expand All @@ -34,49 +72,56 @@ def _check_remote_config_enable_in_agent(cls):
)
return False

@classmethod
def enable(cls):
# type: () -> bool
if cls._check_remote_config_enable_in_agent():
with cls._worker_lock:
if cls._worker is None:
cls._worker = RemoteConfigWorker()
cls._worker.start()

forksafe.register(cls._restart)
atexit.register(cls.disable)
return True
return False
def enable(self):
# type: () -> None
"""
Enable the instrumentation telemetry collection service. If the service has already been
activated before, this method does nothing. Use ``disable`` to turn off the telemetry collection service.
"""
if self.status == ServiceStatus.RUNNING:
return

if self._check_remote_config_enable_in_agent():
self.start()

atexit.register(self.stop)

def disable(self):
# type: () -> None
"""
Disable the remote config service and drop, remote config can be re-enabled
by calling ``enable`` again.
"""
if self.status == ServiceStatus.STOPPED:
return

@classmethod
def _restart(cls):
cls.disable()
cls.enable()
atexit.unregister(self.stop)

@classmethod
def register(cls, product, handler):
self.stop()

def shutdown(self, timeout):
# type: (Optional[float]) -> None
self.stop(timeout)

def _stop_service(
self,
timeout=None, # type: Optional[float]
):
# type: (...) -> None
super(RemoteConfigPoller, self)._stop_service()
self.join(timeout=timeout)

def register(self, product, handler):
try:
# By enabling on registration we ensure we start the RCM client only
# if there is at least one registered product.
if cls.enable():
cls._worker._client.register_product(product, handler)
self._client.register_product(product, handler)
except Exception:
log.warning("error starting the RCM client", exc_info=True)

@classmethod
def unregister(cls, product):
def unregister(self, product):
try:
cls._worker._client.unregister_product(product)
self._client.unregister_product(product)
except Exception:
log.warning("error starting the RCM client", exc_info=True)

@classmethod
def disable(cls):
# type: () -> None
with cls._worker_lock:
if cls._worker is not None:
cls._worker.stop()
cls._worker = None

forksafe.unregister(cls._restart)
atexit.unregister(cls.disable)
remoteconfig_poller = RemoteConfigPoller()
3 changes: 3 additions & 0 deletions ddtrace/internal/remoteconfig/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ def base64_to_struct(val, cls):
self._last_error = None # type: Optional[str]
self._backend_state = None # type: Optional[str]

def has_products(self):
return bool(self._products)

def register_product(self, product_name, func=None):
# type: (str, Optional[ProductCallback]) -> None
if func is not None:
Expand Down
49 changes: 0 additions & 49 deletions ddtrace/internal/remoteconfig/worker.py

This file was deleted.

6 changes: 5 additions & 1 deletion ddtrace/internal/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from .encoding import JSONEncoderV2
from .encoding import MSGPACK_ENCODERS
from .logger import get_logger
from .remoteconfig import remoteconfig_poller
from .runtime import container
from .sma import SimpleMovingAverage

Expand Down Expand Up @@ -532,7 +533,10 @@ def write(self, spans=None):
# are initialized
if asbool(os.getenv("DD_INSTRUMENTATION_TELEMETRY_ENABLED", True)):
telemetry_writer.enable()
# appsec remote config should be enabled/started after the global tracer and configs
# remote config should be enabled/started after the global tracer and configs are initialized
if asbool(os.environ.get("DD_REMOTE_CONFIGURATION_ENABLED", "true")):
avara1986 marked this conversation as resolved.
Show resolved Hide resolved
remoteconfig_poller.enable()
# appsec remote config products should be enabled/started after the global tracer and configs
# are initialized
enable_appsec_rc()
except service.ServiceStatusError:
Expand Down
8 changes: 8 additions & 0 deletions releasenotes/notes/rcm-fix-forks-0575a74fb6009c4d.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
fixes:
- |
Fix for the Remote Configuration Worker creating stuck children under gevent
when an application uses ``os.fork``. When running Remote Configuration Management
with gevent, ensure that you set the `DD_GEVENT_PATCH_ALL`` to `true`. Remote Configuration
Management is enabled by default. If you want to disable it, set the `DD_REMOTE_CONFIGURATION_ENABLED`
environment variable to `false`.
23 changes: 17 additions & 6 deletions riotfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,20 +347,31 @@ def select_pys(min_version=MIN_PYTHON_VERSION, max_version=MAX_PYTHON_VERSION):
Venv(
name="internal",
command="pytest {cmdargs} tests/internal/",
pkgs={
"httpretty": "==0.9.7",
"gevent": latest,
},
pkgs={"httpretty": "==0.9.7", "gevent": latest, "requests": latest},
venvs=[
Venv(pys="2.7", pkgs={"packaging": ["==17.1", latest]}),
Venv(
pys=select_pys(min_version="3.5", max_version="3.6"),
pkgs={"pytest-asyncio": latest, "packaging": ["==17.1", latest]},
pkgs={
"httpretty": "==0.9.7",
"requests": latest,
"pytest-asyncio": latest,
"gunicorn": latest,
"flask": latest,
"packaging": ["==17.1", latest],
},
),
# FIXME[bytecode-3.11]: internal depends on bytecode, which is not python 3.11 compatible.
Venv(
pys=select_pys(min_version="3.7"),
pkgs={"pytest-asyncio": latest, "packaging": ["==17.1", "==22.0", latest]},
pkgs={
"httpretty": "==0.9.7",
"requests": latest,
"pytest-asyncio": latest,
"gunicorn": latest,
"flask": latest,
"packaging": ["==17.1", "==22.0", latest],
},
),
],
),
Expand Down
Loading