Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(telemetry): support python3.12 #7043

Merged
merged 14 commits into from
Sep 26, 2023
Merged
24 changes: 15 additions & 9 deletions ddtrace/internal/processor/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,15 +265,21 @@ def shutdown(self, timeout):
:type timeout: :obj:`int` | :obj:`float` | :obj:`None`
"""
if self._span_metrics["spans_created"] or self._span_metrics["spans_finished"]:
# on_span_start queue span created counts in batches of 100. This ensures all remaining counts are sent
# before the tracer is shutdown.
self._queue_span_count_metrics("spans_created", "integration_name", None)
# on_span_finish(...) queues span finish metrics in batches of 100.
# This ensures all remaining counts are sent before the tracer is shutdown.
self._queue_span_count_metrics("spans_finished", "integration_name", None)
# The telemetry metrics writer can be shutdown before the tracer.
# This ensures all tracer metrics always sent.
telemetry_writer.periodic(True)
if config._telemetry_enabled:
# Telemetry writer is disabled when a process shutsdown. This is to support py3.12.
# Here we submit the remanining span creation metrics without restarting the periodic thread.
# Note - Due to how atexit hooks are registered the telemetry writer is shutdown before the tracer.
telemetry_writer._is_periodic = False
telemetry_writer._enabled = True
# on_span_start queue span created counts in batches of 100. This ensures all remaining counts are sent
# before the tracer is shutdown.
self._queue_span_count_metrics("spans_created", "integration_name", None)
# on_span_finish(...) queues span finish metrics in batches of 100.
# This ensures all remaining counts are sent before the tracer is shutdown.
self._queue_span_count_metrics("spans_finished", "integration_name", None)
telemetry_writer.periodic(True)
# Disable the telemetry writer so no events/metrics/logs are queued during process shutdown
telemetry_writer.disable()

try:
self._writer.stop(timeout)
Expand Down
3 changes: 1 addition & 2 deletions ddtrace/internal/telemetry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,11 @@ def _excepthook(tp, value, root_traceback):
error_msg = "{}:{} {}".format(filename, lineno, str(value))
telemetry_writer.add_integration(integration_name, True, error_msg=error_msg)

if telemetry_writer.started is False:
if not telemetry_writer.started:
telemetry_writer._app_started_event(False)
telemetry_writer._app_dependencies_loaded_event()

telemetry_writer.app_shutdown()
telemetry_writer.disable()

return _ORIGINAL_EXCEPTHOOK(tp, value, root_traceback)

Expand Down
14 changes: 8 additions & 6 deletions ddtrace/internal/telemetry/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def __init__(self, is_periodic=True):
self._error = (0, "") # type: Tuple[int, str]
self._namespace = MetricNamespace()
self._logs = set() # type: Set[Dict[str, Any]]
self._disabled = False
self._enabled = config._telemetry_enabled
self._forked = False # type: bool
self._events_queue = [] # type: List[Dict]
self._configuration_queue = {} # type: Dict[str, Dict]
Expand All @@ -208,7 +208,7 @@ def enable(self):
Enable the instrumentation telemetry collection service. If the service has already been
activated before, this method does nothing. Use ``disable`` to turn off the telemetry collection service.
"""
if not config._telemetry_enabled:
if not self._enabled:
return False

if self.status == ServiceStatus.RUNNING:
Expand All @@ -227,7 +227,7 @@ def disable(self):
Disable the telemetry collection service and drop the existing integrations and events
Once disabled, telemetry collection can not be re-enabled.
"""
self._disabled = True
self._enabled = False
self.reset_queues()
if self._is_periodic and self.status is ServiceStatus.RUNNING:
self.stop()
Expand All @@ -243,7 +243,7 @@ def add_event(self, payload, payload_type):
:param str payload_type: The payload_type denotes the type of telmetery request.
Payload types accepted by telemetry/proxy v2: app-started, app-closing, app-integrations-change
"""
if not self._disabled and self.enable():
if self.enable():
event = {
"tracer_time": int(time.time()),
"runtime_id": get_runtime_id(),
Expand Down Expand Up @@ -292,7 +292,7 @@ def add_error(self, code, msg, filename, line_number):
def _app_started_event(self, register_app_shutdown=True):
# type: (bool) -> None
"""Sent when TelemetryWriter is enabled or forks"""
if self._forked:
if self._forked or self.started:
# app-started events should only be sent by the main process
return
# List of configurations to be collected
Expand Down Expand Up @@ -599,6 +599,7 @@ def periodic(self, force_flush=False):
def app_shutdown(self):
self._app_closing_event()
self.periodic(force_flush=True)
self.disable()

def reset_queues(self):
# type: () -> None
Expand All @@ -624,7 +625,8 @@ def _fork_writer(self):
if self.status == ServiceStatus.STOPPED:
return

self.stop(join=False)
if self._is_periodic:
self.stop(join=False)

# Enable writer service in child process to avoid interpreter shutdown
# error in Python 3.12
Expand Down
5 changes: 3 additions & 2 deletions ddtrace/internal/writer/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,9 @@ def _send_payload(self, payload, count, client):
def start(self):
super(AgentWriter, self).start()
try:
telemetry_writer._app_started_event()
telemetry_writer._app_dependencies_loaded_event()
if not telemetry_writer.started:
telemetry_writer._app_started_event()
telemetry_writer._app_dependencies_loaded_event()

# appsec remote config should be enabled/started after the global tracer and configs
# are initialized
Expand Down
4 changes: 2 additions & 2 deletions tests/appsec/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

@pytest.fixture
def mock_telemetry_lifecycle_writer():
telemetry_writer.disable()
telemetry_writer.enable()
telemetry_writer.reset_queues()
metrics_result = telemetry_writer._namespace._metrics_data
assert len(metrics_result[TELEMETRY_TYPE_GENERATE_METRICS][TELEMETRY_NAMESPACE_TAG_APPSEC]) == 0
assert len(metrics_result[TELEMETRY_TYPE_DISTRIBUTION][TELEMETRY_NAMESPACE_TAG_APPSEC]) == 0
Expand All @@ -19,8 +19,8 @@ def mock_telemetry_lifecycle_writer():

@pytest.fixture
def mock_logs_telemetry_lifecycle_writer():
telemetry_writer.disable()
telemetry_writer.enable()
telemetry_writer.reset_queues()
metrics_result = telemetry_writer._logs
assert len(metrics_result) == 0
yield telemetry_writer
Expand Down
8 changes: 8 additions & 0 deletions tests/telemetry/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ def index():
return "OK", 200


@app.route("/start_application")
def starting_app_view():
# We must call app-started before telemetry events can be sent to the agent.
# This endpoint mocks the behavior of the agent writer.
telemetry_writer._app_started_event()
return "OK", 200


@app.route("/count_metric")
def metrics_view():
telemetry_writer.add_count_metric(
Expand Down
41 changes: 30 additions & 11 deletions tests/telemetry/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,22 @@ def test_app_started_error_unhandled_exception(test_agent_session, run_python_co
assert "Unable to parse DD_SPAN_SAMPLING_RULES='invalid_rules'" in events[2]["payload"]["error"]["message"]


def test_telemetry_with_raised_exception(test_agent_session, run_python_code_in_subprocess):
env = os.environ.copy()
_, stderr, status, _ = run_python_code_in_subprocess(
"import ddtrace; ddtrace.tracer.trace('moon').finish(); raise Exception('bad_code')", env=env
)
assert status == 1, stderr
assert b"bad_code" in stderr
# Regression test for python3.12 support
assert b"RuntimeError: can't create new thread at interpreter shutdown" not in stderr

# Ensure the expected telemetry events are sent
events = test_agent_session.get_events()
event_types = [event["request_type"] for event in events]
assert event_types == ["generate-metrics", "app-closing", "app-dependencies-loaded", "app-started"]


def test_handled_integration_error(test_agent_session, run_python_code_in_subprocess):
code = """
import logging
Expand Down Expand Up @@ -283,17 +299,20 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro
== "failed to import ddtrace module 'ddtrace.contrib.sqlite3' when patching on import"
)

metric_events = [
event
for event in events
if event["request_type"] == "generate-metrics"
and event["payload"]["series"][0]["metric"] == "integration_errors"
]
assert len(metric_events) == 1
assert len(metric_events[0]["payload"]["series"]) == 1
assert metric_events[0]["payload"]["series"][0]["type"] == "count"
assert len(metric_events[0]["payload"]["series"][0]["points"]) == 1
assert metric_events[0]["payload"]["series"][0]["points"][0][1] == 1
# Get metric containing the integration error
integration_error = {}
for event in events:
if event["request_type"] == "generate-metrics":
for metric in event["payload"]["series"]:
if metric["metric"] == "integration_errors":
integration_error = metric
break

# assert the integration metric has the correct type, count, and tags
assert integration_error
assert integration_error["type"] == "count"
assert integration_error["points"][0][1] == 1
assert integration_error["tags"] == ["integration_name:sqlite3", "error_type:attributeerror"]


def test_unhandled_integration_error(test_agent_session, run_python_code_in_subprocess):
Expand Down
14 changes: 4 additions & 10 deletions tests/telemetry/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,6 @@ def test_app_started_event_configuration_override(test_agent_session, run_python
logging.basicConfig()

import ddtrace.auto

from ddtrace.internal.telemetry import telemetry_writer
telemetry_writer.enable()
telemetry_writer.reset_queues()
telemetry_writer._app_started_event()
telemetry_writer.periodic(force_flush=True)
telemetry_writer.disable()
"""

env = os.environ.copy()
Expand Down Expand Up @@ -204,10 +197,11 @@ def test_app_started_event_configuration_override(test_agent_session, run_python
assert status == 0, stderr

events = test_agent_session.get_events()
app_started_events = [event for event in events if event["request_type"] == "app-started"]
assert len(app_started_events) == 1

assert len(events) == 1
events[0]["payload"]["configuration"].sort(key=lambda c: c["name"])
assert events[0]["payload"]["configuration"] == [
app_started_events[0]["payload"]["configuration"].sort(key=lambda c: c["name"])
assert app_started_events[0]["payload"]["configuration"] == [
{"name": "DD_AGENT_HOST", "origin": "unknown", "value": None},
{"name": "DD_AGENT_PORT", "origin": "unknown", "value": None},
{"name": "DD_APPSEC_ENABLED", "origin": "unknown", "value": False},
Expand Down
Loading