Skip to content

Commit

Permalink
fix(telemetry): support python3.12 [backport 2.0] (#7055)
Browse files Browse the repository at this point in the history
Backport 065784b from #7043 to 2.0.

## Description

This PR ensures the instrumentation telemetry client is compatible with
python3.12's threading module by:
- Ensuring the telemetry writer thread is only started once per
application.
- Ensures the telemetry writer thread is disabled when an application is
shutdown.
- Ensures telemetry metrics are queued `SpanAggregator.shutdown` without
restarting the telemetry writer thread.

## Motivation

The following change failed to support the telemetry client in
python3.12: #6859. This PR
will hopefully fix this 🤞.

### Reproduction for python3.12 runtime errors
```
docker run --rm -it python:3.12.0rc3 bash
root@a1f3c1d307ec:/# pip install ddtrace==2.0.0rc2
root@a1f3c1d307ec:/# python -c "import ddtrace; _ = ddtrace.tracer.trace('foo'); raise Exception"
```

### Output
```
Traceback (most recent call last):
  File "<string>", line 1, in <module>
Exception
Exception ignored in atexit callback: <bound method Tracer._atexit of <ddtrace.tracer.Tracer object at 0xffffbd967260>>
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/ddtrace/tracer.py", line 293, in _atexit
    self.shutdown(timeout=self.SHUTDOWN_TIMEOUT)
  File "/usr/local/lib/python3.12/site-packages/ddtrace/tracer.py", line 1024, in shutdown
    processor.shutdown(timeout)
  File "/usr/local/lib/python3.12/site-packages/ddtrace/internal/processor/trace.py", line 270, in shutdown
    self._queue_span_count_metrics("spans_created", "integration_name", None)
  File "/usr/local/lib/python3.12/site-packages/ddtrace/internal/processor/trace.py", line 291, in _queue_span_count_metrics
    telemetry_writer.add_count_metric(
  File "/usr/local/lib/python3.12/site-packages/ddtrace/internal/telemetry/writer.py", line 514, in add_count_metric
    if self.status == ServiceStatus.RUNNING or self.enable():
                                               ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/ddtrace/internal/telemetry/writer.py", line 218, in enable
    self.start()
  File "/usr/local/lib/python3.12/site-packages/ddtrace/internal/service.py", line 58, in start
    self._start_service(*args, **kwargs)
  File "/usr/local/lib/python3.12/site-packages/ddtrace/internal/periodic.py", line 135, in _start_service
    self._worker.start()
  File "/usr/local/lib/python3.12/threading.py", line 971, in start
    _start_new_thread(self._bootstrap, ())
RuntimeError: can't create new thread at interpreter shutdown
```

## Risk

~~This PR reverts an optimization that ensured telemetry span creation
metrics were queued in batches of 100. Without this optimization we can
expect a 5-10% increase to span creation and span finish.~~


## Checklist

- [x] Change(s) are motivated and described in the PR description.
- [x] Testing strategy is described if automated tests are not included
in the PR.
- [x] Risk is outlined (performance impact, potential for breakage,
maintainability, etc).
- [x] Change is maintainable (easy to change, telemetry, documentation).
- [x] [Library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
are followed. If no release note is required, add label
`changelog/no-changelog`.
- [x] Documentation is included (in-code, generated user docs, [public
corp docs](https://github.com/DataDog/documentation/)).
- [x] Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist

- [x] Title is accurate.
- [x] No unnecessary changes are introduced.
- [x] Description motivates each change.
- [x] Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes unless absolutely necessary.
- [x] Testing strategy adequately addresses listed risk(s).
- [x] Change is maintainable (easy to change, telemetry, documentation).
- [x] Release note makes sense to a user of the library.
- [x] Reviewer has explicitly acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment.
- [x] Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)
- [x] If this PR touches code that signs or publishes builds or
packages, or handles credentials of any kind, I've requested a review
from `@DataDog/security-design-and-guidance`.
- [x] This PR doesn't touch any of that.

Co-authored-by: Munir Abdinur <munir.abdinur@datadoghq.com>
  • Loading branch information
github-actions[bot] and mabdinur authored Sep 26, 2023
1 parent 5e48006 commit 176c4b4
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 42 deletions.
24 changes: 15 additions & 9 deletions ddtrace/internal/processor/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,15 +265,21 @@ def shutdown(self, timeout):
:type timeout: :obj:`int` | :obj:`float` | :obj:`None`
"""
if self._span_metrics["spans_created"] or self._span_metrics["spans_finished"]:
# on_span_start queue span created counts in batches of 100. This ensures all remaining counts are sent
# before the tracer is shutdown.
self._queue_span_count_metrics("spans_created", "integration_name", None)
# on_span_finish(...) queues span finish metrics in batches of 100.
# This ensures all remaining counts are sent before the tracer is shutdown.
self._queue_span_count_metrics("spans_finished", "integration_name", None)
# The telemetry metrics writer can be shutdown before the tracer.
# This ensures all tracer metrics always sent.
telemetry_writer.periodic(True)
if config._telemetry_enabled:
# Telemetry writer is disabled when a process shutsdown. This is to support py3.12.
# Here we submit the remanining span creation metrics without restarting the periodic thread.
# Note - Due to how atexit hooks are registered the telemetry writer is shutdown before the tracer.
telemetry_writer._is_periodic = False
telemetry_writer._enabled = True
# on_span_start queue span created counts in batches of 100. This ensures all remaining counts are sent
# before the tracer is shutdown.
self._queue_span_count_metrics("spans_created", "integration_name", None)
# on_span_finish(...) queues span finish metrics in batches of 100.
# This ensures all remaining counts are sent before the tracer is shutdown.
self._queue_span_count_metrics("spans_finished", "integration_name", None)
telemetry_writer.periodic(True)
# Disable the telemetry writer so no events/metrics/logs are queued during process shutdown
telemetry_writer.disable()

try:
self._writer.stop(timeout)
Expand Down
3 changes: 1 addition & 2 deletions ddtrace/internal/telemetry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,11 @@ def _excepthook(tp, value, root_traceback):
error_msg = "{}:{} {}".format(filename, lineno, str(value))
telemetry_writer.add_integration(integration_name, True, error_msg=error_msg)

if telemetry_writer.started is False:
if not telemetry_writer.started:
telemetry_writer._app_started_event(False)
telemetry_writer._app_dependencies_loaded_event()

telemetry_writer.app_shutdown()
telemetry_writer.disable()

return _ORIGINAL_EXCEPTHOOK(tp, value, root_traceback)

Expand Down
14 changes: 8 additions & 6 deletions ddtrace/internal/telemetry/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def __init__(self, is_periodic=True):
self._error = (0, "") # type: Tuple[int, str]
self._namespace = MetricNamespace()
self._logs = set() # type: Set[Dict[str, Any]]
self._disabled = False
self._enabled = config._telemetry_enabled
self._forked = False # type: bool
self._events_queue = [] # type: List[Dict]
self._configuration_queue = {} # type: Dict[str, Dict]
Expand All @@ -208,7 +208,7 @@ def enable(self):
Enable the instrumentation telemetry collection service. If the service has already been
activated before, this method does nothing. Use ``disable`` to turn off the telemetry collection service.
"""
if not config._telemetry_enabled:
if not self._enabled:
return False

if self.status == ServiceStatus.RUNNING:
Expand All @@ -227,7 +227,7 @@ def disable(self):
Disable the telemetry collection service and drop the existing integrations and events
Once disabled, telemetry collection can not be re-enabled.
"""
self._disabled = True
self._enabled = False
self.reset_queues()
if self._is_periodic and self.status is ServiceStatus.RUNNING:
self.stop()
Expand All @@ -243,7 +243,7 @@ def add_event(self, payload, payload_type):
:param str payload_type: The payload_type denotes the type of telmetery request.
Payload types accepted by telemetry/proxy v2: app-started, app-closing, app-integrations-change
"""
if not self._disabled and self.enable():
if self.enable():
event = {
"tracer_time": int(time.time()),
"runtime_id": get_runtime_id(),
Expand Down Expand Up @@ -292,7 +292,7 @@ def add_error(self, code, msg, filename, line_number):
def _app_started_event(self, register_app_shutdown=True):
# type: (bool) -> None
"""Sent when TelemetryWriter is enabled or forks"""
if self._forked:
if self._forked or self.started:
# app-started events should only be sent by the main process
return
# List of configurations to be collected
Expand Down Expand Up @@ -599,6 +599,7 @@ def periodic(self, force_flush=False):
def app_shutdown(self):
self._app_closing_event()
self.periodic(force_flush=True)
self.disable()

def reset_queues(self):
# type: () -> None
Expand All @@ -624,7 +625,8 @@ def _fork_writer(self):
if self.status == ServiceStatus.STOPPED:
return

self.stop(join=False)
if self._is_periodic:
self.stop(join=False)

# Enable writer service in child process to avoid interpreter shutdown
# error in Python 3.12
Expand Down
5 changes: 3 additions & 2 deletions ddtrace/internal/writer/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,9 @@ def _send_payload(self, payload, count, client):
def start(self):
super(AgentWriter, self).start()
try:
telemetry_writer._app_started_event()
telemetry_writer._app_dependencies_loaded_event()
if not telemetry_writer.started:
telemetry_writer._app_started_event()
telemetry_writer._app_dependencies_loaded_event()

# appsec remote config should be enabled/started after the global tracer and configs
# are initialized
Expand Down
4 changes: 2 additions & 2 deletions tests/appsec/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

@pytest.fixture
def mock_telemetry_lifecycle_writer():
telemetry_writer.disable()
telemetry_writer.enable()
telemetry_writer.reset_queues()
metrics_result = telemetry_writer._namespace._metrics_data
assert len(metrics_result[TELEMETRY_TYPE_GENERATE_METRICS][TELEMETRY_NAMESPACE_TAG_APPSEC]) == 0
assert len(metrics_result[TELEMETRY_TYPE_DISTRIBUTION][TELEMETRY_NAMESPACE_TAG_APPSEC]) == 0
Expand All @@ -19,8 +19,8 @@ def mock_telemetry_lifecycle_writer():

@pytest.fixture
def mock_logs_telemetry_lifecycle_writer():
telemetry_writer.disable()
telemetry_writer.enable()
telemetry_writer.reset_queues()
metrics_result = telemetry_writer._logs
assert len(metrics_result) == 0
yield telemetry_writer
Expand Down
8 changes: 8 additions & 0 deletions tests/telemetry/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ def index():
return "OK", 200


@app.route("/start_application")
def starting_app_view():
# We must call app-started before telemetry events can be sent to the agent.
# This endpoint mocks the behavior of the agent writer.
telemetry_writer._app_started_event()
return "OK", 200


@app.route("/count_metric")
def metrics_view():
telemetry_writer.add_count_metric(
Expand Down
41 changes: 30 additions & 11 deletions tests/telemetry/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,22 @@ def test_app_started_error_unhandled_exception(test_agent_session, run_python_co
assert "Unable to parse DD_SPAN_SAMPLING_RULES='invalid_rules'" in events[2]["payload"]["error"]["message"]


def test_telemetry_with_raised_exception(test_agent_session, run_python_code_in_subprocess):
env = os.environ.copy()
_, stderr, status, _ = run_python_code_in_subprocess(
"import ddtrace; ddtrace.tracer.trace('moon').finish(); raise Exception('bad_code')", env=env
)
assert status == 1, stderr
assert b"bad_code" in stderr
# Regression test for python3.12 support
assert b"RuntimeError: can't create new thread at interpreter shutdown" not in stderr

# Ensure the expected telemetry events are sent
events = test_agent_session.get_events()
event_types = [event["request_type"] for event in events]
assert event_types == ["generate-metrics", "app-closing", "app-dependencies-loaded", "app-started"]


def test_handled_integration_error(test_agent_session, run_python_code_in_subprocess):
code = """
import logging
Expand Down Expand Up @@ -283,17 +299,20 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro
== "failed to import ddtrace module 'ddtrace.contrib.sqlite3' when patching on import"
)

metric_events = [
event
for event in events
if event["request_type"] == "generate-metrics"
and event["payload"]["series"][0]["metric"] == "integration_errors"
]
assert len(metric_events) == 1
assert len(metric_events[0]["payload"]["series"]) == 1
assert metric_events[0]["payload"]["series"][0]["type"] == "count"
assert len(metric_events[0]["payload"]["series"][0]["points"]) == 1
assert metric_events[0]["payload"]["series"][0]["points"][0][1] == 1
# Get metric containing the integration error
integration_error = {}
for event in events:
if event["request_type"] == "generate-metrics":
for metric in event["payload"]["series"]:
if metric["metric"] == "integration_errors":
integration_error = metric
break

# assert the integration metric has the correct type, count, and tags
assert integration_error
assert integration_error["type"] == "count"
assert integration_error["points"][0][1] == 1
assert integration_error["tags"] == ["integration_name:sqlite3", "error_type:attributeerror"]


def test_unhandled_integration_error(test_agent_session, run_python_code_in_subprocess):
Expand Down
14 changes: 4 additions & 10 deletions tests/telemetry/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,6 @@ def test_app_started_event_configuration_override(test_agent_session, run_python
logging.basicConfig()
import ddtrace.auto
from ddtrace.internal.telemetry import telemetry_writer
telemetry_writer.enable()
telemetry_writer.reset_queues()
telemetry_writer._app_started_event()
telemetry_writer.periodic(force_flush=True)
telemetry_writer.disable()
"""

env = os.environ.copy()
Expand Down Expand Up @@ -204,10 +197,11 @@ def test_app_started_event_configuration_override(test_agent_session, run_python
assert status == 0, stderr

events = test_agent_session.get_events()
app_started_events = [event for event in events if event["request_type"] == "app-started"]
assert len(app_started_events) == 1

assert len(events) == 1
events[0]["payload"]["configuration"].sort(key=lambda c: c["name"])
assert events[0]["payload"]["configuration"] == [
app_started_events[0]["payload"]["configuration"].sort(key=lambda c: c["name"])
assert app_started_events[0]["payload"]["configuration"] == [
{"name": "DD_AGENT_HOST", "origin": "unknown", "value": None},
{"name": "DD_AGENT_PORT", "origin": "unknown", "value": None},
{"name": "DD_APPSEC_ENABLED", "origin": "unknown", "value": False},
Expand Down

0 comments on commit 176c4b4

Please sign in to comment.