Skip to content

Commit 7d1b086

Browse files
Merge branch 'main' into duncan-harvey/azure-eventhubs-integration
2 parents 1215b02 + 0dc1c91 commit 7d1b086

File tree

22 files changed

+548
-28
lines changed

22 files changed

+548
-28
lines changed

.github/workflows/test_lib_injection.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ jobs:
3030
- "3.11"
3131
- "3.12"
3232
- "3.13"
33-
- "3.14.0rc3"
33+
- "3.14"
3434
steps:
3535
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
3636
with:

.gitlab/benchmarks/bp-runner.microbenchmarks.fail-on-breach.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1067,7 +1067,7 @@ experiments:
10671067
- max_rss_usage < 34.00 MB
10681068
- name: span-start-finish-traceid128
10691069
thresholds:
1070-
- execution_time < 55.00 ms
1070+
- execution_time < 56.00 ms
10711071
- max_rss_usage < 34.00 MB
10721072
- name: span-start-traceid128
10731073
thresholds:

ddtrace/_trace/tracer.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,14 @@
5353
from ddtrace.internal.runtime import get_runtime_id
5454
from ddtrace.internal.schema.processor import BaseServiceProcessor
5555
from ddtrace.internal.utils import _get_metas_to_propagate
56+
from ddtrace.internal.utils.deprecations import DDTraceDeprecationWarning
5657
from ddtrace.internal.utils.formats import format_trace_id
5758
from ddtrace.internal.writer import AgentWriterInterface
5859
from ddtrace.internal.writer import HTTPWriter
5960
from ddtrace.settings._config import config
6061
from ddtrace.settings.asm import config as asm_config
6162
from ddtrace.settings.peer_service import _ps_config
63+
from ddtrace.vendor.debtcollector.removals import remove
6264
from ddtrace.version import get_version
6365

6466

@@ -197,6 +199,11 @@ def _atexit(self) -> None:
197199
)
198200
self.shutdown(timeout=self.SHUTDOWN_TIMEOUT)
199201

202+
@remove(
203+
message="on_start_span is being removed with no replacement",
204+
removal_version="4.0.0",
205+
category=DDTraceDeprecationWarning,
206+
)
200207
def on_start_span(self, func: Callable[[Span], None]) -> Callable[[Span], None]:
201208
"""Register a function to execute when a span start.
202209
@@ -208,6 +215,11 @@ def on_start_span(self, func: Callable[[Span], None]) -> Callable[[Span], None]:
208215
core.on("trace.span_start", callback=func)
209216
return func
210217

218+
@remove(
219+
message="deregister_on_start_span is being removed with no replacement",
220+
removal_version="4.0.0",
221+
category=DDTraceDeprecationWarning,
222+
)
211223
def deregister_on_start_span(self, func: Callable[[Span], None]) -> Callable[[Span], None]:
212224
"""Unregister a function registered to execute when a span starts.
213225

ddtrace/appsec/_asm_request_context.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import json
33
import re
44
from types import TracebackType
5+
from typing import TYPE_CHECKING
56
from typing import Any
67
from typing import Callable
78
from typing import Dict
@@ -17,8 +18,6 @@
1718
from ddtrace.appsec._constants import APPSEC
1819
from ddtrace.appsec._constants import SPAN_DATA_NAMES
1920
from ddtrace.appsec._constants import Constant_Class
20-
from ddtrace.appsec._utils import DDWaf_info
21-
from ddtrace.appsec._utils import DDWaf_result
2221
from ddtrace.appsec._utils import Telemetry_result
2322
from ddtrace.appsec._utils import get_triggers
2423
from ddtrace.contrib.internal.trace_utils_base import _normalize_tag_name
@@ -29,6 +28,10 @@
2928
from ddtrace.settings.asm import config as asm_config
3029

3130

31+
if TYPE_CHECKING:
32+
from ddtrace.appsec._utils import DDWaf_info
33+
from ddtrace.appsec._utils import DDWaf_result
34+
3235
logger = ddlogger.get_logger(__name__)
3336

3437

@@ -95,7 +98,7 @@ def __init__(self, span: Optional[Span] = None, rc_products: str = ""):
9598
else:
9699
self.framework = self.span.name
97100
self.framework = self.framework.lower().replace(" ", "_")
98-
self.waf_info: Optional[Callable[[], DDWaf_info]] = None
101+
self.waf_info: Optional[Callable[[], "DDWaf_info"]] = None
99102
self.waf_addresses: Dict[str, Any] = {}
100103
self.callbacks: Dict[str, Any] = {_CONTEXT_CALL: []}
101104
self.telemetry: Telemetry_result = Telemetry_result()
@@ -375,15 +378,15 @@ def set_waf_callback(value) -> None:
375378
set_value(_CALLBACKS, _WAF_CALL, value)
376379

377380

378-
def set_waf_info(info: Callable[[], DDWaf_info]) -> None:
381+
def set_waf_info(info: Callable[[], "DDWaf_info"]) -> None:
379382
env = _get_asm_context()
380383
if env is None:
381384
logger.warning(WARNING_TAGS.SET_WAF_INFO_NO_ASM_CONTEXT, extra=log_extra, stack_info=True)
382385
return
383386
env.waf_info = info
384387

385388

386-
def call_waf_callback(custom_data: Optional[Dict[str, Any]] = None, **kwargs) -> Optional[DDWaf_result]:
389+
def call_waf_callback(custom_data: Optional[Dict[str, Any]] = None, **kwargs) -> Optional["DDWaf_result"]:
387390
if not asm_config._asm_enabled:
388391
return None
389392
callback = get_value(_CALLBACKS, _WAF_CALL)
@@ -480,7 +483,7 @@ def asm_request_context_set(
480483
def set_waf_telemetry_results(
481484
rules_version: str,
482485
is_blocked: bool,
483-
waf_results: DDWaf_result,
486+
waf_results: "DDWaf_result",
484487
rule_type: Optional[str],
485488
is_sampled: bool,
486489
) -> None:

ddtrace/appsec/_utils.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from typing import List
1111
from typing import Optional
1212
from typing import Union
13-
import uuid
1413

1514
from ddtrace.appsec._constants import API_SECURITY
1615
from ddtrace.appsec._constants import APPSEC
@@ -236,6 +235,10 @@ def _safe_userid(user_id):
236235
return user_id
237236
except ValueError:
238237
try:
238+
# Import uuid lazily because this also imports threading via the
239+
# platform module
240+
import uuid
241+
239242
_ = uuid.UUID(user_id)
240243
return user_id
241244
except ValueError:

ddtrace/bootstrap/sitecustomize.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ def drop(module_name):
5555
if not asbool(do_cleanup):
5656
return
5757

58+
# We need to import these modules to make sure they grab references to the
59+
# right modules before we start unloading stuff.
60+
import ddtrace.internal.http # noqa
61+
import ddtrace.internal.uds # noqa
62+
5863
# Unload all the modules that we have imported, except for the ddtrace one.
5964
# NB: this means that every `import threading` anywhere in `ddtrace/` code
6065
# uses a copy of that module that is distinct from the copy that user code
@@ -94,6 +99,10 @@ def drop(module_name):
9499
# submodule makes use of threading so it is critical to unload when
95100
# gevent is used.
96101
"concurrent.futures",
102+
# We unload the threading module in case it was imported by
103+
# CPython on boot.
104+
"threading",
105+
"_thread",
97106
]
98107
)
99108
for u in UNLOAD_MODULES:

ddtrace/internal/coverage/code.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
from ddtrace.internal.coverage.util import collapse_ranges
1515
from ddtrace.internal.logger import get_logger
1616
from ddtrace.internal.module import ModuleWatchdog
17-
from ddtrace.internal.packages import is_user_code
1817
from ddtrace.internal.packages import platlib_path
1918
from ddtrace.internal.packages import platstdlib_path
2019
from ddtrace.internal.packages import purelib_path
@@ -336,9 +335,6 @@ def transform(self, code: CodeType, _module: ModuleType) -> CodeType:
336335
# Don't instrument code from standard library/site packages/etc.
337336
return code
338337

339-
if not is_user_code(code_path):
340-
return code
341-
342338
retval = self.instrument_code(code, _module.__package__ if _module is not None else "")
343339

344340
if self._collect_import_coverage:
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
"""
2+
Feature Flagging and Experimentation (FFAndE) product module.
3+
4+
This module handles Feature Flag configuration rules from Remote Configuration
5+
and forwards the raw bytes to the native FFAndE processor.
6+
"""

ddtrace/internal/ffande/_native.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
"""
2+
Native interface for FFAndE (Feature Flagging and Experimentation) processing.
3+
4+
This module provides the interface to the PyO3 native function that processes
5+
feature flag configuration rules.
6+
"""
7+
from typing import Optional
8+
9+
from ddtrace.internal.logger import get_logger
10+
11+
12+
log = get_logger(__name__)
13+
14+
is_available = True
15+
16+
try:
17+
from ddtrace.internal.native._native import ffande_process_config
18+
except ImportError:
19+
is_available = False
20+
log.debug("FFAndE native module not available, feature flag processing disabled")
21+
22+
# Provide a no-op fallback
23+
def ffande_process_config(config_bytes: bytes) -> Optional[bool]:
24+
"""Fallback implementation when native module is not available."""
25+
log.warning("FFAndE native module not available, ignoring configuration")
26+
return None
27+
28+
29+
def process_ffe_configuration(config_bytes: bytes) -> bool:
30+
"""
31+
Process feature flag configuration by forwarding raw bytes to native function.
32+
33+
Args:
34+
config_bytes: Raw bytes from Remote Configuration payload
35+
36+
Returns:
37+
True if processing was successful, False otherwise
38+
"""
39+
if not is_available:
40+
log.debug("FFAndE native module not available, skipping configuration")
41+
return False
42+
43+
try:
44+
result = ffande_process_config(config_bytes)
45+
if result is None:
46+
log.debug("FFAndE native processing returned None")
47+
return False
48+
return result
49+
except Exception as e:
50+
log.error("Error processing FFE configuration: %s", e, exc_info=True)
51+
return False

ddtrace/internal/ffande/product.py

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
"""
2+
FFAndE (Feature Flagging and Experimentation) product implementation.
3+
4+
This product receives feature flag configuration rules from Remote Configuration
5+
and processes them through the native FFAndE processor.
6+
"""
7+
import enum
8+
import json
9+
import typing as t
10+
11+
from ddtrace import config
12+
from ddtrace.internal.ffande._native import process_ffe_configuration
13+
from ddtrace.internal.logger import get_logger
14+
from ddtrace.internal.remoteconfig import Payload
15+
from ddtrace.internal.remoteconfig._connectors import PublisherSubscriberConnector
16+
from ddtrace.internal.remoteconfig._publishers import RemoteConfigPublisher
17+
from ddtrace.internal.remoteconfig._pubsub import PubSub
18+
from ddtrace.internal.remoteconfig._subscribers import RemoteConfigSubscriber
19+
20+
21+
requires = ["remote-configuration"]
22+
23+
log = get_logger(__name__)
24+
25+
# FFAndE product name constant
26+
FFE_FLAGS_PRODUCT = "FFE_FLAGS"
27+
28+
29+
class FFAndECapabilities(enum.IntFlag):
30+
"""FFAndE Remote Configuration capabilities."""
31+
32+
FFE_FLAG_CONFIGURATION_RULES = 1 << 46
33+
34+
35+
class FFAndEAdapter(PubSub):
36+
"""
37+
FFAndE Remote Configuration adapter.
38+
39+
Receives feature flag configuration rules and forwards raw bytes to native processor.
40+
"""
41+
42+
__publisher_class__ = RemoteConfigPublisher
43+
__subscriber_class__ = RemoteConfigSubscriber
44+
__shared_data__ = PublisherSubscriberConnector()
45+
46+
def __init__(self):
47+
self._publisher = self.__publisher_class__(self.__shared_data__)
48+
self._subscriber = self.__subscriber_class__(self.__shared_data__, self.rc_callback, FFE_FLAGS_PRODUCT)
49+
50+
# Register with Remote Configuration poller
51+
if config._remote_config_enabled:
52+
from ddtrace.internal.remoteconfig.worker import remoteconfig_poller
53+
54+
remoteconfig_poller.register(
55+
FFE_FLAGS_PRODUCT,
56+
self,
57+
restart_on_fork=True,
58+
capabilities=[FFAndECapabilities.FFE_FLAG_CONFIGURATION_RULES],
59+
)
60+
61+
def rc_callback(self, payloads: t.Sequence[Payload]) -> None:
62+
"""
63+
Process FFE configuration payloads from Remote Configuration.
64+
65+
Args:
66+
payloads: Sequence of configuration payloads
67+
"""
68+
for payload in payloads:
69+
if payload.metadata is None:
70+
log.debug("Ignoring invalid FFE payload with no metadata, path: %s", payload.path)
71+
continue
72+
73+
log.debug("Received FFE config payload: %s", payload.metadata.id)
74+
75+
if payload.content is None:
76+
log.debug(
77+
"Received FFE config deletion, product: %s, path: %s",
78+
payload.metadata.product_name,
79+
payload.path,
80+
)
81+
# Handle deletion/removal of configuration
82+
continue
83+
84+
try:
85+
# Serialize payload content to bytes for native processing
86+
# The native function expects raw bytes, so we convert the dict to JSON
87+
config_json = json.dumps(payload.content, ensure_ascii=False)
88+
config_bytes = config_json.encode("utf-8")
89+
90+
log.debug("Processing FFE config ID: %s, size: %d bytes", payload.metadata.id, len(config_bytes))
91+
92+
success = process_ffe_configuration(config_bytes)
93+
if success:
94+
log.debug("Successfully processed FFE config ID: %s", payload.metadata.id)
95+
else:
96+
log.warning("Failed to process FFE config ID: %s", payload.metadata.id)
97+
98+
except Exception as e:
99+
log.error("Error processing FFE config payload: %s", e, exc_info=True)
100+
101+
102+
_adapter: t.Optional[FFAndEAdapter] = None
103+
104+
105+
def post_preload():
106+
"""Called during preload phase."""
107+
pass
108+
109+
110+
def start():
111+
"""Start the FFAndE product and register with Remote Configuration."""
112+
global _adapter
113+
114+
if not config._remote_config_enabled:
115+
log.debug("Remote configuration disabled, FFAndE not started")
116+
return
117+
118+
_adapter = FFAndEAdapter()
119+
120+
log.debug("FFAndE product registered with Remote Configuration")
121+
122+
123+
def restart(join=False):
124+
"""Restart the FFAndE product."""
125+
pass
126+
127+
128+
def stop(join=False):
129+
"""Stop the FFAndE product."""
130+
global _adapter
131+
132+
if not config._remote_config_enabled:
133+
return
134+
135+
from ddtrace.internal.remoteconfig.worker import remoteconfig_poller
136+
137+
remoteconfig_poller.unregister(FFE_FLAGS_PRODUCT)
138+
_adapter = None
139+
140+
log.debug("FFAndE product unregistered from Remote Configuration")
141+
142+
143+
def at_exit(join=False):
144+
"""Called at exit."""
145+
stop(join=join)

0 commit comments

Comments
 (0)