From 176c4b4dc6dd8c23465df75a1c9db3ad054b0884 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 26 Sep 2023 23:53:01 +0000 Subject: [PATCH] fix(telemetry): support python3.12 [backport 2.0] (#7055) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Backport 065784b511b045f490113b2b2658d15aaa16ce7b from #7043 to 2.0. ## Description This PR ensures the instrumentation telemetry client is compatible with python3.12's threading module by: - Ensuring the telemetry writer thread is only started once per application. - Ensures the telemetry writer thread is disabled when an application is shutdown. - Ensures telemetry metrics are queued `SpanAggregator.shutdown` without restarting the telemetry writer thread. ## Motivation The following change failed to support the telemetry client in python3.12: https://github.com/DataDog/dd-trace-py/pull/6859. This PR will hopefully fix this 🤞. ### Reproduction for python3.12 runtime errors ``` docker run --rm -it python:3.12.0rc3 bash root@a1f3c1d307ec:/# pip install ddtrace==2.0.0rc2 root@a1f3c1d307ec:/# python -c "import ddtrace; _ = ddtrace.tracer.trace('foo'); raise Exception" ``` ### Output ``` Traceback (most recent call last): File "", line 1, in Exception Exception ignored in atexit callback: > Traceback (most recent call last): File "/usr/local/lib/python3.12/site-packages/ddtrace/tracer.py", line 293, in _atexit self.shutdown(timeout=self.SHUTDOWN_TIMEOUT) File "/usr/local/lib/python3.12/site-packages/ddtrace/tracer.py", line 1024, in shutdown processor.shutdown(timeout) File "/usr/local/lib/python3.12/site-packages/ddtrace/internal/processor/trace.py", line 270, in shutdown self._queue_span_count_metrics("spans_created", "integration_name", None) File "/usr/local/lib/python3.12/site-packages/ddtrace/internal/processor/trace.py", line 291, in _queue_span_count_metrics telemetry_writer.add_count_metric( File "/usr/local/lib/python3.12/site-packages/ddtrace/internal/telemetry/writer.py", line 514, in add_count_metric if self.status == ServiceStatus.RUNNING or self.enable(): ^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/ddtrace/internal/telemetry/writer.py", line 218, in enable self.start() File "/usr/local/lib/python3.12/site-packages/ddtrace/internal/service.py", line 58, in start self._start_service(*args, **kwargs) File "/usr/local/lib/python3.12/site-packages/ddtrace/internal/periodic.py", line 135, in _start_service self._worker.start() File "/usr/local/lib/python3.12/threading.py", line 971, in start _start_new_thread(self._bootstrap, ()) RuntimeError: can't create new thread at interpreter shutdown ``` ## Risk ~~This PR reverts an optimization that ensured telemetry span creation metrics were queued in batches of 100. Without this optimization we can expect a 5-10% increase to span creation and span finish.~~ ## Checklist - [x] Change(s) are motivated and described in the PR description. - [x] Testing strategy is described if automated tests are not included in the PR. - [x] Risk is outlined (performance impact, potential for breakage, maintainability, etc). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed. If no release note is required, add label `changelog/no-changelog`. - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)). - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Title is accurate. - [x] No unnecessary changes are introduced. - [x] Description motivates each change. - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [x] Testing strategy adequately addresses listed risk(s). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] Release note makes sense to a user of the library. - [x] Reviewer has explicitly acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment. - [x] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) - [x] If this PR touches code that signs or publishes builds or packages, or handles credentials of any kind, I've requested a review from `@DataDog/security-design-and-guidance`. - [x] This PR doesn't touch any of that. Co-authored-by: Munir Abdinur --- ddtrace/internal/processor/trace.py | 24 +++++++++------ ddtrace/internal/telemetry/__init__.py | 3 +- ddtrace/internal/telemetry/writer.py | 14 +++++---- ddtrace/internal/writer/writer.py | 5 ++-- tests/appsec/conftest.py | 4 +-- tests/telemetry/app.py | 8 +++++ tests/telemetry/test_telemetry.py | 41 +++++++++++++++++++------- tests/telemetry/test_writer.py | 14 +++------ 8 files changed, 71 insertions(+), 42 deletions(-) diff --git a/ddtrace/internal/processor/trace.py b/ddtrace/internal/processor/trace.py index 475fd23c462..886df210079 100644 --- a/ddtrace/internal/processor/trace.py +++ b/ddtrace/internal/processor/trace.py @@ -265,15 +265,21 @@ def shutdown(self, timeout): :type timeout: :obj:`int` | :obj:`float` | :obj:`None` """ if self._span_metrics["spans_created"] or self._span_metrics["spans_finished"]: - # on_span_start queue span created counts in batches of 100. This ensures all remaining counts are sent - # before the tracer is shutdown. - self._queue_span_count_metrics("spans_created", "integration_name", None) - # on_span_finish(...) queues span finish metrics in batches of 100. - # This ensures all remaining counts are sent before the tracer is shutdown. - self._queue_span_count_metrics("spans_finished", "integration_name", None) - # The telemetry metrics writer can be shutdown before the tracer. - # This ensures all tracer metrics always sent. - telemetry_writer.periodic(True) + if config._telemetry_enabled: + # Telemetry writer is disabled when a process shutsdown. This is to support py3.12. + # Here we submit the remanining span creation metrics without restarting the periodic thread. + # Note - Due to how atexit hooks are registered the telemetry writer is shutdown before the tracer. + telemetry_writer._is_periodic = False + telemetry_writer._enabled = True + # on_span_start queue span created counts in batches of 100. This ensures all remaining counts are sent + # before the tracer is shutdown. + self._queue_span_count_metrics("spans_created", "integration_name", None) + # on_span_finish(...) queues span finish metrics in batches of 100. + # This ensures all remaining counts are sent before the tracer is shutdown. + self._queue_span_count_metrics("spans_finished", "integration_name", None) + telemetry_writer.periodic(True) + # Disable the telemetry writer so no events/metrics/logs are queued during process shutdown + telemetry_writer.disable() try: self._writer.stop(timeout) diff --git a/ddtrace/internal/telemetry/__init__.py b/ddtrace/internal/telemetry/__init__.py index e8d4395a90c..eba59ac504a 100644 --- a/ddtrace/internal/telemetry/__init__.py +++ b/ddtrace/internal/telemetry/__init__.py @@ -49,12 +49,11 @@ def _excepthook(tp, value, root_traceback): error_msg = "{}:{} {}".format(filename, lineno, str(value)) telemetry_writer.add_integration(integration_name, True, error_msg=error_msg) - if telemetry_writer.started is False: + if not telemetry_writer.started: telemetry_writer._app_started_event(False) telemetry_writer._app_dependencies_loaded_event() telemetry_writer.app_shutdown() - telemetry_writer.disable() return _ORIGINAL_EXCEPTHOOK(tp, value, root_traceback) diff --git a/ddtrace/internal/telemetry/writer.py b/ddtrace/internal/telemetry/writer.py index a6b29f2e298..8071faf61ec 100644 --- a/ddtrace/internal/telemetry/writer.py +++ b/ddtrace/internal/telemetry/writer.py @@ -189,7 +189,7 @@ def __init__(self, is_periodic=True): self._error = (0, "") # type: Tuple[int, str] self._namespace = MetricNamespace() self._logs = set() # type: Set[Dict[str, Any]] - self._disabled = False + self._enabled = config._telemetry_enabled self._forked = False # type: bool self._events_queue = [] # type: List[Dict] self._configuration_queue = {} # type: Dict[str, Dict] @@ -208,7 +208,7 @@ def enable(self): Enable the instrumentation telemetry collection service. If the service has already been activated before, this method does nothing. Use ``disable`` to turn off the telemetry collection service. """ - if not config._telemetry_enabled: + if not self._enabled: return False if self.status == ServiceStatus.RUNNING: @@ -227,7 +227,7 @@ def disable(self): Disable the telemetry collection service and drop the existing integrations and events Once disabled, telemetry collection can not be re-enabled. """ - self._disabled = True + self._enabled = False self.reset_queues() if self._is_periodic and self.status is ServiceStatus.RUNNING: self.stop() @@ -243,7 +243,7 @@ def add_event(self, payload, payload_type): :param str payload_type: The payload_type denotes the type of telmetery request. Payload types accepted by telemetry/proxy v2: app-started, app-closing, app-integrations-change """ - if not self._disabled and self.enable(): + if self.enable(): event = { "tracer_time": int(time.time()), "runtime_id": get_runtime_id(), @@ -292,7 +292,7 @@ def add_error(self, code, msg, filename, line_number): def _app_started_event(self, register_app_shutdown=True): # type: (bool) -> None """Sent when TelemetryWriter is enabled or forks""" - if self._forked: + if self._forked or self.started: # app-started events should only be sent by the main process return # List of configurations to be collected @@ -599,6 +599,7 @@ def periodic(self, force_flush=False): def app_shutdown(self): self._app_closing_event() self.periodic(force_flush=True) + self.disable() def reset_queues(self): # type: () -> None @@ -624,7 +625,8 @@ def _fork_writer(self): if self.status == ServiceStatus.STOPPED: return - self.stop(join=False) + if self._is_periodic: + self.stop(join=False) # Enable writer service in child process to avoid interpreter shutdown # error in Python 3.12 diff --git a/ddtrace/internal/writer/writer.py b/ddtrace/internal/writer/writer.py index 1a04d27640c..ee6679a5a73 100644 --- a/ddtrace/internal/writer/writer.py +++ b/ddtrace/internal/writer/writer.py @@ -622,8 +622,9 @@ def _send_payload(self, payload, count, client): def start(self): super(AgentWriter, self).start() try: - telemetry_writer._app_started_event() - telemetry_writer._app_dependencies_loaded_event() + if not telemetry_writer.started: + telemetry_writer._app_started_event() + telemetry_writer._app_dependencies_loaded_event() # appsec remote config should be enabled/started after the global tracer and configs # are initialized diff --git a/tests/appsec/conftest.py b/tests/appsec/conftest.py index fdc141e2c63..6b379ca54f4 100644 --- a/tests/appsec/conftest.py +++ b/tests/appsec/conftest.py @@ -8,8 +8,8 @@ @pytest.fixture def mock_telemetry_lifecycle_writer(): - telemetry_writer.disable() telemetry_writer.enable() + telemetry_writer.reset_queues() metrics_result = telemetry_writer._namespace._metrics_data assert len(metrics_result[TELEMETRY_TYPE_GENERATE_METRICS][TELEMETRY_NAMESPACE_TAG_APPSEC]) == 0 assert len(metrics_result[TELEMETRY_TYPE_DISTRIBUTION][TELEMETRY_NAMESPACE_TAG_APPSEC]) == 0 @@ -19,8 +19,8 @@ def mock_telemetry_lifecycle_writer(): @pytest.fixture def mock_logs_telemetry_lifecycle_writer(): - telemetry_writer.disable() telemetry_writer.enable() + telemetry_writer.reset_queues() metrics_result = telemetry_writer._logs assert len(metrics_result) == 0 yield telemetry_writer diff --git a/tests/telemetry/app.py b/tests/telemetry/app.py index 93bc58e7567..d3c1ef1aa8e 100644 --- a/tests/telemetry/app.py +++ b/tests/telemetry/app.py @@ -12,6 +12,14 @@ def index(): return "OK", 200 +@app.route("/start_application") +def starting_app_view(): + # We must call app-started before telemetry events can be sent to the agent. + # This endpoint mocks the behavior of the agent writer. + telemetry_writer._app_started_event() + return "OK", 200 + + @app.route("/count_metric") def metrics_view(): telemetry_writer.add_count_metric( diff --git a/tests/telemetry/test_telemetry.py b/tests/telemetry/test_telemetry.py index bc7ee6b2b7b..157bfb8746a 100644 --- a/tests/telemetry/test_telemetry.py +++ b/tests/telemetry/test_telemetry.py @@ -245,6 +245,22 @@ def test_app_started_error_unhandled_exception(test_agent_session, run_python_co assert "Unable to parse DD_SPAN_SAMPLING_RULES='invalid_rules'" in events[2]["payload"]["error"]["message"] +def test_telemetry_with_raised_exception(test_agent_session, run_python_code_in_subprocess): + env = os.environ.copy() + _, stderr, status, _ = run_python_code_in_subprocess( + "import ddtrace; ddtrace.tracer.trace('moon').finish(); raise Exception('bad_code')", env=env + ) + assert status == 1, stderr + assert b"bad_code" in stderr + # Regression test for python3.12 support + assert b"RuntimeError: can't create new thread at interpreter shutdown" not in stderr + + # Ensure the expected telemetry events are sent + events = test_agent_session.get_events() + event_types = [event["request_type"] for event in events] + assert event_types == ["generate-metrics", "app-closing", "app-dependencies-loaded", "app-started"] + + def test_handled_integration_error(test_agent_session, run_python_code_in_subprocess): code = """ import logging @@ -283,17 +299,20 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro == "failed to import ddtrace module 'ddtrace.contrib.sqlite3' when patching on import" ) - metric_events = [ - event - for event in events - if event["request_type"] == "generate-metrics" - and event["payload"]["series"][0]["metric"] == "integration_errors" - ] - assert len(metric_events) == 1 - assert len(metric_events[0]["payload"]["series"]) == 1 - assert metric_events[0]["payload"]["series"][0]["type"] == "count" - assert len(metric_events[0]["payload"]["series"][0]["points"]) == 1 - assert metric_events[0]["payload"]["series"][0]["points"][0][1] == 1 + # Get metric containing the integration error + integration_error = {} + for event in events: + if event["request_type"] == "generate-metrics": + for metric in event["payload"]["series"]: + if metric["metric"] == "integration_errors": + integration_error = metric + break + + # assert the integration metric has the correct type, count, and tags + assert integration_error + assert integration_error["type"] == "count" + assert integration_error["points"][0][1] == 1 + assert integration_error["tags"] == ["integration_name:sqlite3", "error_type:attributeerror"] def test_unhandled_integration_error(test_agent_session, run_python_code_in_subprocess): diff --git a/tests/telemetry/test_writer.py b/tests/telemetry/test_writer.py index 2f26997e789..98b71fcf571 100644 --- a/tests/telemetry/test_writer.py +++ b/tests/telemetry/test_writer.py @@ -143,13 +143,6 @@ def test_app_started_event_configuration_override(test_agent_session, run_python logging.basicConfig() import ddtrace.auto - -from ddtrace.internal.telemetry import telemetry_writer -telemetry_writer.enable() -telemetry_writer.reset_queues() -telemetry_writer._app_started_event() -telemetry_writer.periodic(force_flush=True) -telemetry_writer.disable() """ env = os.environ.copy() @@ -204,10 +197,11 @@ def test_app_started_event_configuration_override(test_agent_session, run_python assert status == 0, stderr events = test_agent_session.get_events() + app_started_events = [event for event in events if event["request_type"] == "app-started"] + assert len(app_started_events) == 1 - assert len(events) == 1 - events[0]["payload"]["configuration"].sort(key=lambda c: c["name"]) - assert events[0]["payload"]["configuration"] == [ + app_started_events[0]["payload"]["configuration"].sort(key=lambda c: c["name"]) + assert app_started_events[0]["payload"]["configuration"] == [ {"name": "DD_AGENT_HOST", "origin": "unknown", "value": None}, {"name": "DD_AGENT_PORT", "origin": "unknown", "value": None}, {"name": "DD_APPSEC_ENABLED", "origin": "unknown", "value": False},