Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(telemetry): support python3.12 #7043

Merged
merged 14 commits into from
Sep 26, 2023
Merged
44 changes: 6 additions & 38 deletions ddtrace/internal/processor/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from collections import defaultdict
from threading import Lock
from threading import RLock
from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional
Expand Down Expand Up @@ -188,29 +187,22 @@ class _Trace(object):
_lock = attr.ib(init=False, factory=RLock, repr=False, type=Union[RLock, Lock])
else:
_lock = attr.ib(init=False, factory=Lock, repr=False, type=Union[RLock, Lock])
# Tracks the number of spans created and tags each count with the api that was used
# ex: otel api, opentracing api, datadog api
_span_metrics = attr.ib(
init=False,
factory=lambda: {
"spans_created": defaultdict(int),
"spans_finished": defaultdict(int),
},
type=Dict[str, DefaultDict],
)

def on_span_start(self, span):
# type: (Span) -> None
with self._lock:
trace = self._traces[span.trace_id]
trace.spans.append(span)
self._span_metrics["spans_created"][span._span_api] += 1
self._queue_span_count_metrics("spans_created", "integration_name")
telemetry_writer.add_count_metric(
TELEMETRY_NAMESPACE_TAG_TRACER, "spans_created", tags=(("integration_name", span._span_api),)
)
mabdinur marked this conversation as resolved.
Show resolved Hide resolved

def on_span_finish(self, span):
# type: (Span) -> None
telemetry_writer.add_count_metric(
TELEMETRY_NAMESPACE_TAG_TRACER, "spans_finished", tags=(("integration_name", span._span_api),)
)
with self._lock:
self._span_metrics["spans_finished"][span._span_api] += 1
trace = self._traces[span.trace_id]
trace.num_finished += 1
should_partial_flush = self._partial_flush_enabled and trace.num_finished >= self._partial_flush_min_spans
Expand Down Expand Up @@ -247,7 +239,6 @@ def on_span_finish(self, span):
except Exception:
log.error("error applying processor %r", tp, exc_info=True)

self._queue_span_count_metrics("spans_finished", "integration_name")
self._writer.write(spans)
return

Expand All @@ -264,35 +255,12 @@ def shutdown(self, timeout):
before exiting or :obj:`None` to block until flushing has successfully completed (default: :obj:`None`)
:type timeout: :obj:`int` | :obj:`float` | :obj:`None`
"""
if self._span_metrics["spans_created"] or self._span_metrics["spans_finished"]:
# on_span_start queue span created counts in batches of 100. This ensures all remaining counts are sent
# before the tracer is shutdown.
self._queue_span_count_metrics("spans_created", "integration_name", None)
# on_span_finish(...) queues span finish metrics in batches of 100.
# This ensures all remaining counts are sent before the tracer is shutdown.
self._queue_span_count_metrics("spans_finished", "integration_name", None)
# The telemetry metrics writer can be shutdown before the tracer.
# This ensures all tracer metrics always sent.
telemetry_writer.periodic(True)

try:
self._writer.stop(timeout)
except ServiceStatusError:
# It's possible the writer never got started in the first place :(
pass

def _queue_span_count_metrics(self, metric_name, tag_name, min_count=100):
# type: (str, str, Optional[int]) -> None
"""Queues a telemetry count metric for span created and span finished"""
# perf: telemetry_metrics_writer.add_count_metric(...) is an expensive operation.
# We should avoid calling this method on every invocation of span finish and span start.
if min_count is None or sum(self._span_metrics[metric_name].values()) >= min_count:
for tag_value, count in self._span_metrics[metric_name].items():
telemetry_writer.add_count_metric(
TELEMETRY_NAMESPACE_TAG_TRACER, metric_name, count, tags=((tag_name, tag_value),)
)
self._span_metrics[metric_name] = defaultdict(int)


@attr.s
class SpanSamplingProcessor(SpanProcessor):
Expand Down
3 changes: 1 addition & 2 deletions ddtrace/internal/telemetry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,11 @@ def _excepthook(tp, value, root_traceback):
error_msg = "{}:{} {}".format(filename, lineno, str(value))
telemetry_writer.add_integration(integration_name, True, error_msg=error_msg)

if telemetry_writer.started is False:
if not telemetry_writer.started:
telemetry_writer._app_started_event(False)
telemetry_writer._app_dependencies_loaded_event()

telemetry_writer.app_shutdown()
telemetry_writer.disable()

return _ORIGINAL_EXCEPTHOOK(tp, value, root_traceback)

Expand Down
19 changes: 6 additions & 13 deletions ddtrace/internal/telemetry/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def __init__(self, is_periodic=True):
self._error = (0, "") # type: Tuple[int, str]
self._namespace = MetricNamespace()
self._logs = set() # type: Set[Dict[str, Any]]
self._disabled = False
self._enabled = config._telemetry_enabled
self._forked = False # type: bool
self._events_queue = [] # type: List[Dict]
self._configuration_queue = {} # type: Dict[str, Dict]
Expand All @@ -208,7 +208,7 @@ def enable(self):
Enable the instrumentation telemetry collection service. If the service has already been
activated before, this method does nothing. Use ``disable`` to turn off the telemetry collection service.
"""
if not config._telemetry_enabled:
if not self._enabled:
return False

if self.status == ServiceStatus.RUNNING:
Expand All @@ -227,7 +227,7 @@ def disable(self):
Disable the telemetry collection service and drop the existing integrations and events
Once disabled, telemetry collection can not be re-enabled.
"""
self._disabled = True
self._enabled = False
self.reset_queues()
if self._is_periodic and self.status is ServiceStatus.RUNNING:
self.stop()
Expand All @@ -243,7 +243,7 @@ def add_event(self, payload, payload_type):
:param str payload_type: The payload_type denotes the type of telmetery request.
Payload types accepted by telemetry/proxy v2: app-started, app-closing, app-integrations-change
"""
if not self._disabled and self.enable():
if self.enable():
event = {
"tracer_time": int(time.time()),
"runtime_id": get_runtime_id(),
Expand Down Expand Up @@ -292,7 +292,7 @@ def add_error(self, code, msg, filename, line_number):
def _app_started_event(self, register_app_shutdown=True):
# type: (bool) -> None
"""Sent when TelemetryWriter is enabled or forks"""
if self._forked:
if self._forked or self.started:
# app-started events should only be sent by the main process
return
# List of configurations to be collected
Expand Down Expand Up @@ -599,6 +599,7 @@ def periodic(self, force_flush=False):
def app_shutdown(self):
self._app_closing_event()
self.periodic(force_flush=True)
self.disable()

def reset_queues(self):
# type: () -> None
Expand All @@ -621,14 +622,6 @@ def _fork_writer(self):
# Avoid sending duplicate events.
# Queued events should be sent in the main process.
self.reset_queues()
mabdinur marked this conversation as resolved.
Show resolved Hide resolved
if self.status == ServiceStatus.STOPPED:
return

self.stop(join=False)

# Enable writer service in child process to avoid interpreter shutdown
# error in Python 3.12
self.enable()

def _restart_sequence(self):
self._sequence = itertools.count(1)
Expand Down
5 changes: 3 additions & 2 deletions ddtrace/internal/writer/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,9 @@ def _send_payload(self, payload, count, client):
def start(self):
super(AgentWriter, self).start()
try:
telemetry_writer._app_started_event()
telemetry_writer._app_dependencies_loaded_event()
if not telemetry_writer.started:
telemetry_writer._app_started_event()
telemetry_writer._app_dependencies_loaded_event()

# appsec remote config should be enabled/started after the global tracer and configs
# are initialized
Expand Down
4 changes: 2 additions & 2 deletions tests/appsec/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

@pytest.fixture
def mock_telemetry_lifecycle_writer():
telemetry_writer.disable()
telemetry_writer.enable()
telemetry_writer.reset_queues()
metrics_result = telemetry_writer._namespace._metrics_data
assert len(metrics_result[TELEMETRY_TYPE_GENERATE_METRICS][TELEMETRY_NAMESPACE_TAG_APPSEC]) == 0
assert len(metrics_result[TELEMETRY_TYPE_DISTRIBUTION][TELEMETRY_NAMESPACE_TAG_APPSEC]) == 0
Expand All @@ -19,8 +19,8 @@ def mock_telemetry_lifecycle_writer():

@pytest.fixture
def mock_logs_telemetry_lifecycle_writer():
telemetry_writer.disable()
telemetry_writer.enable()
telemetry_writer.reset_queues()
metrics_result = telemetry_writer._logs
assert len(metrics_result) == 0
yield telemetry_writer
Expand Down
8 changes: 8 additions & 0 deletions tests/telemetry/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ def index():
return "OK", 200


@app.route("/start_application")
def starting_app_view():
# We must call app-started before telemetry events can be sent to the agent.
# This endpoint mocks the behavior of the agent writer.
telemetry_writer._app_started_event()
return "OK", 200


@app.route("/count_metric")
def metrics_view():
telemetry_writer.add_count_metric(
Expand Down
41 changes: 30 additions & 11 deletions tests/telemetry/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,22 @@ def test_app_started_error_unhandled_exception(test_agent_session, run_python_co
assert "Unable to parse DD_SPAN_SAMPLING_RULES='invalid_rules'" in events[2]["payload"]["error"]["message"]


def test_telemetry_with_raised_exception(test_agent_session, run_python_code_in_subprocess):
env = os.environ.copy()
_, stderr, status, _ = run_python_code_in_subprocess(
"import ddtrace; ddtrace.tracer.trace('moon').finish(); raise Exception('bad_code')", env=env
)
assert status == 1, stderr
assert b"bad_code" in stderr
# Regression test for python3.12 support
assert b"RuntimeError: can't create new thread at interpreter shutdown" not in stderr

# Ensure the expected telemetry events are sent
events = test_agent_session.get_events()
event_types = [event["request_type"] for event in events]
assert event_types == ["generate-metrics", "app-closing", "app-dependencies-loaded", "app-started"]


def test_handled_integration_error(test_agent_session, run_python_code_in_subprocess):
code = """
import logging
Expand Down Expand Up @@ -283,17 +299,20 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro
== "failed to import ddtrace module 'ddtrace.contrib.sqlite3' when patching on import"
)

metric_events = [
event
for event in events
if event["request_type"] == "generate-metrics"
and event["payload"]["series"][0]["metric"] == "integration_errors"
]
assert len(metric_events) == 1
assert len(metric_events[0]["payload"]["series"]) == 1
assert metric_events[0]["payload"]["series"][0]["type"] == "count"
assert len(metric_events[0]["payload"]["series"][0]["points"]) == 1
assert metric_events[0]["payload"]["series"][0]["points"][0][1] == 1
# Get metric containing the integration error
integration_error = {}
for event in events:
if event["request_type"] == "generate-metrics":
for metric in event["payload"]["series"]:
if metric["metric"] == "integration_errors":
integration_error = metric
break

# assert the integration metric has the correct type, count, and tags
assert integration_error
assert integration_error["type"] == "count"
assert integration_error["points"][0][1] == 1
assert integration_error["tags"] == ["integration_name:sqlite3", "error_type:attributeerror"]


def test_unhandled_integration_error(test_agent_session, run_python_code_in_subprocess):
Expand Down
17 changes: 6 additions & 11 deletions tests/telemetry/test_telemetry_metrics_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ def test_telemetry_metrics_enabled_on_gunicorn_child_process(test_agent_session)
with gunicorn_server(telemetry_metrics_enabled="true", token=token) as context:
_, gunicorn_client = context

response = gunicorn_client.get("/start_application")
assert response.status_code == 200

gunicorn_client.get("/count_metric")
gunicorn_client.get("/count_metric")
response = gunicorn_client.get("/count_metric")
Expand Down Expand Up @@ -191,18 +194,10 @@ def test_span_creation_no_finish(test_agent_session, ddtrace_run_python_code_in_
assert status == 0, stderr

events = test_agent_session.get_events()
metrics = get_metrics_from_events(events)
assert len(metrics) == 3

assert metrics[0]["metric"] == "spans_created"
assert metrics[0]["tags"] == ["integration_name:datadog"]
assert metrics[0]["points"][0][1] == 4
assert metrics[1]["metric"] == "spans_created"
assert metrics[1]["tags"] == ["integration_name:opentracing"]
assert metrics[1]["points"][0][1] == 4
assert metrics[2]["metric"] == "spans_created"
assert metrics[2]["tags"] == ["integration_name:otel"]
assert metrics[2]["points"][0][1] == 4
metrics = get_metrics_from_events(events)
# Telemeetry events are only submitted after the first trace is finished and sent to the agent
ZStriker19 marked this conversation as resolved.
Show resolved Hide resolved
assert len(metrics) == 0


def get_metrics_from_events(events):
Expand Down
14 changes: 4 additions & 10 deletions tests/telemetry/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,6 @@ def test_app_started_event_configuration_override(test_agent_session, run_python
logging.basicConfig()

import ddtrace.auto

from ddtrace.internal.telemetry import telemetry_writer
telemetry_writer.enable()
telemetry_writer.reset_queues()
telemetry_writer._app_started_event()
telemetry_writer.periodic(force_flush=True)
telemetry_writer.disable()
"""

env = os.environ.copy()
Expand Down Expand Up @@ -204,10 +197,11 @@ def test_app_started_event_configuration_override(test_agent_session, run_python
assert status == 0, stderr

events = test_agent_session.get_events()
app_started_events = [event for event in events if event["request_type"] == "app-started"]
assert len(app_started_events) == 1

assert len(events) == 1
events[0]["payload"]["configuration"].sort(key=lambda c: c["name"])
assert events[0]["payload"]["configuration"] == [
app_started_events[0]["payload"]["configuration"].sort(key=lambda c: c["name"])
assert app_started_events[0]["payload"]["configuration"] == [
{"name": "DD_AGENT_HOST", "origin": "unknown", "value": None},
{"name": "DD_AGENT_PORT", "origin": "unknown", "value": None},
{"name": "DD_APPSEC_ENABLED", "origin": "unknown", "value": False},
Expand Down
30 changes: 10 additions & 20 deletions tests/tracer/test_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,37 +375,27 @@ def test_span_normalizator():


def test_span_creation_metrics():
"""Test that telemetry metrics are queued in batches of 100 and the remainder is sent on shutdown"""
"""Test that telemetry span creation metrics are queued"""
writer = DummyWriter()
aggr = SpanAggregator(partial_flush_enabled=False, partial_flush_min_spans=0, trace_processors=[], writer=writer)

with mock.patch("ddtrace.internal.processor.trace.telemetry_writer.add_count_metric") as mock_tm:
for _ in range(300):
span = Span("span", on_finish=[aggr.on_span_finish])
span = Span("span")
aggr.on_span_start(span)
span.finish()

span = Span("span", on_finish=[aggr.on_span_finish])
aggr.on_span_start(span)
span.finish()

mock_tm.assert_has_calls(
[
mock.call("tracers", "spans_created", 100, tags=(("integration_name", "datadog"),)),
mock.call("tracers", "spans_finished", 100, tags=(("integration_name", "datadog"),)),
mock.call("tracers", "spans_created", 100, tags=(("integration_name", "datadog"),)),
mock.call("tracers", "spans_finished", 100, tags=(("integration_name", "datadog"),)),
mock.call("tracers", "spans_created", 100, tags=(("integration_name", "datadog"),)),
mock.call("tracers", "spans_finished", 100, tags=(("integration_name", "datadog"),)),
]
[mock.call("tracers", "spans_created", tags=(("integration_name", "datadog"),)) for _ in range(300)]
)

mock_tm.reset_mock()
aggr.shutdown(None)

for _ in range(300):
span = Span("span", on_finish=[aggr.on_span_finish])
span.finish()

mock_tm.assert_has_calls(
[
mock.call("tracers", "spans_created", 1, tags=(("integration_name", "datadog"),)),
mock.call("tracers", "spans_finished", 1, tags=(("integration_name", "datadog"),)),
]
[mock.call("tracers", "spans_finished", tags=(("integration_name", "datadog"),)) for _ in range(300)]
)


Expand Down
Loading