Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(llmobs): decorators can infer span links #12043

Open
wants to merge 35 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
5723509
span links
Dec 12, 2024
a1c0512
implict parent link
Dec 13, 2024
2b106d0
automatic correlation
Dec 13, 2024
05f3026
save some progress on span linking
Dec 15, 2024
6fede1f
automatic span linking poc
Dec 17, 2024
7acc0e7
oops
Dec 17, 2024
f627038
cleanmup
Dec 17, 2024
55191a5
janky solution for list, str, dicts
Dec 18, 2024
72769cc
add default logic for parents
Dec 18, 2024
a4ead29
catch fns returning iterables
Dec 18, 2024
6ce0a6f
more comp obj rel tracking
Dec 24, 2024
baf9527
refactor
Dec 30, 2024
0d147b5
cleanup
Dec 30, 2024
22d8030
cleanup record rel
Dec 30, 2024
0f92a02
helper track item if primitve fn
Dec 30, 2024
2fd2435
support getitem
Dec 30, 2024
635a138
remove a bunch of object tracking stuff
Jan 22, 2025
fe93665
conf
Jan 22, 2025
a4eadff
remove the trace processor
Jan 22, 2025
a4aa30c
add a simple test case
Jan 22, 2025
f2ed301
tests
Jan 23, 2025
8d1e5a6
change util fn name
Jan 23, 2025
641f3f9
dont set empty list of span links
Jan 23, 2025
8cfd66a
link onyl spans of same trace
Jan 23, 2025
7b76ed1
be more defensive with key errors
Jan 23, 2025
aaedab9
chore: update changelog for version 2.19.2 (#12088)
Yun-Kim Jan 27, 2025
e3045a1
fix(profiling): fix SystemError when collecting memory profiler event…
nsrip-dd Jan 27, 2025
55767a7
chore(tracing): refactor web server integrations to use the core modu…
wconti27 Jan 28, 2025
16d5280
ci(tracer): make serverless test unrequired (#12121)
christophe-papazian Jan 28, 2025
4f0bcb5
chore(asm): clean libddwaf loading (#12102)
christophe-papazian Jan 28, 2025
c4448ea
fix(llmobs): propagate distributed headers via signal dispatching, no…
Yun-Kim Jan 28, 2025
cb41f8e
feat(provider): expose context provider in ddtrace.trace (#12135)
mabdinur Jan 29, 2025
af9098c
chore(ci): skip non-linux OCI package creation (#12036)
randomanderson Jan 30, 2025
e73d001
gate behind flag
Feb 3, 2025
1e8dd3a
merg conf"
Feb 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,14 @@ ddtrace/internal/remoteconfig @DataDog/remote-config @DataDog/apm-core-pyt
tests/internal/remoteconfig @DataDog/remote-config @DataDog/apm-core-python

# API SDK
ddtrace/trace/ @DataDog/apm-sdk-api-python
ddtrace/_trace/ @DataDog/apm-sdk-api-python
ddtrace/opentelemetry/ @DataDog/apm-sdk-api-python
ddtrace/internal/opentelemetry @DataDog/apm-sdk-api-python
ddtrace/opentracer/ @DataDog/apm-sdk-api-python
ddtrace/propagation/ @DataDog/apm-sdk-api-python
ddtrace/filters.py @DataDog/apm-sdk-api-python
ddtrace/provider.py @DataDog/apm-sdk-api-python
ddtrace/pin.py @DataDog/apm-sdk-api-python
ddtrace/sampler.py @DataDog/apm-sdk-api-python
ddtrace/sampling_rule.py @DataDog/apm-sdk-api-python
Expand Down
1 change: 1 addition & 0 deletions .gitlab/benchmarks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ benchmark-serverless:
tags: ["arch:amd64"]
when: on_success
needs: [ "benchmark-serverless-trigger" ]
allow_failure: true
script:
- git clone https://gitlab-ci-token:${CI_JOB_TOKEN}@gitlab.ddbuild.io/DataDog/serverless-tools.git ./serverless-tools && cd ./serverless-tools
- ./ci/check_trigger_status.sh
Expand Down
5 changes: 5 additions & 0 deletions .gitlab/prepare-oci-package.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
#!/bin/bash
set -eo pipefail

if [ "$OS" != "linux" ]; then
echo "Only linux packages are supported. Exiting"
exit 0
fi

if [ -n "$CI_COMMIT_TAG" ] && [ -z "$PYTHON_PACKAGE_VERSION" ]; then
PYTHON_PACKAGE_VERSION=${CI_COMMIT_TAG##v}
fi
Expand Down
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,21 @@ Changelogs for versions not listed here can be found at https://github.com/DataD

---

## 2.19.2
### Bug Fixes

- Tracing
- celery: Fixes an issue where `celery.apply` spans from Celery prerun got closed too soon leading to span tags being missing.
- openai: Fixes a patching issue where asynchronous moderation endpoint calls resulted in coroutine scheduling errors.
- openai: Ensures the OpenAI integration is compatible with Python versions 3.12 and 3.13.
- vertexai: Resolves an issue with `chat.send_message()` where the content keyword argument was not parsed correctly.
- LLM Observability
- This fix resolves an issue where annotating a span with non latin-1 (but valid utf-8) input/output values resulted in encoding errors.
- Lib-Injection
- Fixes incorrect telemetry data payload format.

---

## 2.19.1
### Bug Fixes

Expand Down
56 changes: 54 additions & 2 deletions ddtrace/_trace/trace_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,14 @@ def _get_parameters_for_new_span_directly_from_context(ctx: core.ExecutionContex
def _start_span(ctx: core.ExecutionContext, call_trace: bool = True, **kwargs) -> "Span":
span_kwargs = _get_parameters_for_new_span_directly_from_context(ctx)
call_trace = ctx.get_item("call_trace", call_trace)
tracer = (ctx.get_item("middleware") or ctx["pin"]).tracer
tracer = ctx.get_item("tracer") or (ctx.get_item("middleware") or ctx["pin"]).tracer
distributed_headers_config = ctx.get_item("distributed_headers_config")
if distributed_headers_config:
trace_utils.activate_distributed_headers(
tracer, int_config=distributed_headers_config, request_headers=ctx["distributed_headers"]
tracer,
int_config=distributed_headers_config,
request_headers=ctx["distributed_headers"],
override=ctx.get_item("distributed_headers_config_override"),
)
distributed_context = ctx.get_item("distributed_context")
if distributed_context and not call_trace:
Expand All @@ -126,6 +129,42 @@ def _start_span(ctx: core.ExecutionContext, call_trace: bool = True, **kwargs) -
return span


def _set_web_frameworks_tags(ctx, span, int_config):
span.set_tag_str(COMPONENT, int_config.integration_name)
span.set_tag_str(SPAN_KIND, SpanKind.SERVER)
span.set_tag(_SPAN_MEASURED_KEY)

analytics_enabled = ctx.get_item("analytics_enabled")
analytics_sample_rate = ctx.get_item("analytics_sample_rate", True)

# Configure trace search sample rate
if (config._analytics_enabled and analytics_enabled is not False) or analytics_enabled is True:
span.set_tag(_ANALYTICS_SAMPLE_RATE_KEY, analytics_sample_rate)


def _on_web_framework_start_request(ctx, int_config):
request_span = ctx.get_item("req_span")
_set_web_frameworks_tags(ctx, request_span, int_config)


def _on_web_framework_finish_request(
span, int_config, method, url, status_code, query, req_headers, res_headers, route, finish
):
trace_utils.set_http_meta(
span=span,
integration_config=int_config,
method=method,
url=url,
status_code=status_code,
query=query,
request_headers=req_headers,
response_headers=res_headers,
route=route,
)
if finish:
span.finish()


def _on_traced_request_context_started_flask(ctx):
current_span = ctx["pin"].tracer.current_span()
if not ctx["pin"].enabled or not current_span:
Expand Down Expand Up @@ -761,6 +800,10 @@ def listen():
core.on("azure.functions.request_call_modifier", _on_azure_functions_request_span_modifier)
core.on("azure.functions.start_response", _on_azure_functions_start_response)

# web frameworks general handlers
core.on("web.request.start", _on_web_framework_start_request)
core.on("web.request.finish", _on_web_framework_finish_request)

core.on("test_visibility.enable", _on_test_visibility_enable)
core.on("test_visibility.disable", _on_test_visibility_disable)
core.on("test_visibility.is_enabled", _on_test_visibility_is_enabled, "is_enabled")
Expand All @@ -769,6 +812,14 @@ def listen():
core.on("rq.queue.enqueue_job", _propagate_context)

for context_name in (
# web frameworks
"aiohttp.request",
"bottle.request",
"cherrypy.request",
"falcon.request",
"molten.request",
"pyramid.request",
"sanic.request",
"flask.call",
"flask.jsonify",
"flask.render_template",
Expand All @@ -779,6 +830,7 @@ def listen():
"django.template.render",
"django.process_exception",
"django.func.wrapped",
# non web frameworks
"botocore.instrumented_api_call",
"botocore.instrumented_lib_function",
"botocore.patched_kinesis_api_call",
Expand Down
3 changes: 3 additions & 0 deletions ddtrace/appsec/_ddwaf/ddwaf_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@

if system() == "Linux":
try:
asm_config._bypass_instrumentation_for_waf = True
ctypes.CDLL(ctypes.util.find_library("rt"), mode=ctypes.RTLD_GLOBAL)
except Exception: # nosec
pass
finally:
asm_config._bypass_instrumentation_for_waf = False

ARCHI = machine().lower()

Expand Down
26 changes: 14 additions & 12 deletions ddtrace/appsec/_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,19 @@
from json.decoder import JSONDecodeError
import os
import os.path
from typing import TYPE_CHECKING
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from typing import Tuple
from typing import Union


if TYPE_CHECKING:
import ddtrace.appsec._ddwaf as ddwaf

import weakref

from ddtrace._trace.processor import SpanProcessor
Expand Down Expand Up @@ -167,14 +173,17 @@ def __post_init__(self) -> None:
def delayed_init(self) -> None:
try:
if self._rules is not None and not hasattr(self, "_ddwaf"):
self._ddwaf = ddwaf.DDWaf(
from ddtrace.appsec._ddwaf import DDWaf # noqa: E402
import ddtrace.appsec._metrics as metrics # noqa: E402

self.metrics = metrics
self._ddwaf = DDWaf(
self._rules, self.obfuscation_parameter_key_regexp, self.obfuscation_parameter_value_regexp
)
_set_waf_init_metric(self._ddwaf.info)
self.metrics._set_waf_init_metric(self._ddwaf.info)
except Exception:
# Partial of DDAS-0005-00
log.warning("[DDAS-0005-00] WAF initialization failed")
raise
self._update_required()

def _update_required(self):
Expand All @@ -193,7 +202,7 @@ def _update_rules(self, new_rules: Dict[str, Any]) -> bool:
if asm_config._asm_static_rule_file is not None:
return result
result = self._ddwaf.update_rules(new_rules)
_set_waf_updates_metric(self._ddwaf.info)
self.metrics._set_waf_updates_metric(self._ddwaf.info)
self._update_required()
return result

Expand Down Expand Up @@ -241,7 +250,7 @@ def waf_callable(custom_data=None, **kwargs):
return self._waf_action(span._local_root or span, ctx, custom_data, **kwargs)

_asm_request_context.set_waf_callback(waf_callable)
_asm_request_context.add_context_callback(_set_waf_request_metrics)
_asm_request_context.add_context_callback(self.metrics._set_waf_request_metrics)
if headers is not None:
_asm_request_context.set_waf_address(SPAN_DATA_NAMES.REQUEST_HEADERS_NO_COOKIES, headers)
_asm_request_context.set_waf_address(
Expand Down Expand Up @@ -436,10 +445,3 @@ def on_span_finish(self, span: Span) -> None:
del self._span_to_waf_ctx[s]
except Exception: # nosec B110
pass


# load waf at the end only to avoid possible circular imports with gevent
import ddtrace.appsec._ddwaf as ddwaf # noqa: E402
from ddtrace.appsec._metrics import _set_waf_init_metric # noqa: E402
from ddtrace.appsec._metrics import _set_waf_request_metrics # noqa: E402
from ddtrace.appsec._metrics import _set_waf_updates_metric # noqa: E402
105 changes: 49 additions & 56 deletions ddtrace/contrib/internal/aiohttp/middlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,10 @@
from aiohttp.web_urldispatcher import SystemRoute

from ddtrace import config
from ddtrace.constants import _ANALYTICS_SAMPLE_RATE_KEY
from ddtrace.constants import _SPAN_MEASURED_KEY
from ddtrace.constants import SPAN_KIND
from ddtrace.contrib import trace_utils
from ddtrace.contrib.asyncio import context_provider
from ddtrace.ext import SpanKind
from ddtrace.ext import SpanTypes
from ddtrace.ext import http
from ddtrace.internal.constants import COMPONENT
from ddtrace.internal import core
from ddtrace.internal.schema import schematize_url_operation
from ddtrace.internal.schema.span_attribute_schema import SpanDirection

Expand All @@ -35,47 +30,42 @@ async def attach_context(request):
# application configs
tracer = app[CONFIG_KEY]["tracer"]
service = app[CONFIG_KEY]["service"]
distributed_tracing = app[CONFIG_KEY]["distributed_tracing_enabled"]
# Create a new context based on the propagated information.
trace_utils.activate_distributed_headers(
tracer,
int_config=config.aiohttp,
request_headers=request.headers,
override=distributed_tracing,
)

# trace the handler
request_span = tracer.trace(
schematize_url_operation("aiohttp.request", protocol="http", direction=SpanDirection.INBOUND),
service=service,
span_type=SpanTypes.WEB,
)
request_span.set_tag(_SPAN_MEASURED_KEY)

request_span.set_tag_str(COMPONENT, config.aiohttp.integration_name)

# set span.kind tag equal to type of request
request_span.set_tag_str(SPAN_KIND, SpanKind.SERVER)

# Configure trace search sample rate
# DEV: aiohttp is special case maintains separate configuration from config api
analytics_enabled = app[CONFIG_KEY]["analytics_enabled"]
if (config._analytics_enabled and analytics_enabled is not False) or analytics_enabled is True:
request_span.set_tag(_ANALYTICS_SAMPLE_RATE_KEY, app[CONFIG_KEY].get("analytics_sample_rate", True))

# attach the context and the root span to the request; the Context
# may be freely used by the application code
request[REQUEST_CONTEXT_KEY] = request_span.context
request[REQUEST_SPAN_KEY] = request_span
request[REQUEST_CONFIG_KEY] = app[CONFIG_KEY]
try:
response = await handler(request)
if isinstance(response, web.StreamResponse):
request.task.add_done_callback(lambda _: finish_request_span(request, response))
return response
except Exception:
request_span.set_traceback()
raise
# Create a new context based on the propagated information.

with core.context_with_data(
"aiohttp.request",
span_name=schematize_url_operation("aiohttp.request", protocol="http", direction=SpanDirection.INBOUND),
span_type=SpanTypes.WEB,
service=service,
tags={},
tracer=tracer,
distributed_headers=request.headers,
distributed_headers_config=config.aiohttp,
distributed_headers_config_override=app[CONFIG_KEY]["distributed_tracing_enabled"],
headers_case_sensitive=True,
analytics_enabled=analytics_enabled,
analytics_sample_rate=app[CONFIG_KEY].get("analytics_sample_rate", True),
) as ctx:
req_span = ctx.span

ctx.set_item("req_span", req_span)
core.dispatch("web.request.start", (ctx, config.aiohttp))

# attach the context and the root span to the request; the Context
# may be freely used by the application code
request[REQUEST_CONTEXT_KEY] = req_span.context
request[REQUEST_SPAN_KEY] = req_span
request[REQUEST_CONFIG_KEY] = app[CONFIG_KEY]
try:
response = await handler(request)
if isinstance(response, web.StreamResponse):
request.task.add_done_callback(lambda _: finish_request_span(request, response))
return response
except Exception:
req_span.set_traceback()
raise

return attach_context

Expand Down Expand Up @@ -122,19 +112,22 @@ def finish_request_span(request, response):
# SystemRoute objects exist to throw HTTP errors and have no path
route = aiohttp_route.resource.canonical

trace_utils.set_http_meta(
request_span,
config.aiohttp,
method=request.method,
url=str(request.url), # DEV: request.url is a yarl's URL object
status_code=response.status,
request_headers=request.headers,
response_headers=response.headers,
route=route,
core.dispatch(
"web.request.finish",
(
request_span,
config.aiohttp,
request.method,
str(request.url), # DEV: request.url is a yarl's URL object
response.status,
None, # query arg = None
request.headers,
response.headers,
route,
True,
),
)

request_span.finish()


async def on_prepare(request, response):
"""
Expand Down
Loading
Loading