Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: propagate trace context from user app #16812

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/prefect/deployments/flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import anyio
import pendulum
from opentelemetry import trace
from opentelemetry.instrumentation.utils import is_instrumentation_enabled

import prefect
from prefect._result_records import ResultRecordMetadata
Expand All @@ -13,9 +15,7 @@
from prefect.logging import get_logger
from prefect.states import Pending, Scheduled
from prefect.tasks import Task
from prefect.telemetry.run_telemetry import (
LABELS_TRACEPARENT_KEY,
)
from prefect.telemetry.run_telemetry import LABELS_TRACEPARENT_KEY, RunTelemetry
from prefect.utilities.asyncutils import sync_compatible
from prefect.utilities.slugify import slugify

Expand Down Expand Up @@ -164,6 +164,8 @@ async def run_deployment(

if flow_run_ctx and flow_run_ctx.flow_run:
traceparent = flow_run_ctx.flow_run.labels.get(LABELS_TRACEPARENT_KEY)
elif is_instrumentation_enabled():
traceparent = RunTelemetry.traceparent_from_span(span=trace.get_current_span())
else:
traceparent = None

Expand Down
5 changes: 3 additions & 2 deletions src/prefect/telemetry/run_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def _start_span(
},
)

if traceparent := self._traceparent_from_span(self.span):
if traceparent := RunTelemetry.traceparent_from_span(self.span):
run.labels[LABELS_TRACEPARENT_KEY] = traceparent

return traceparent, self.span
Expand All @@ -150,7 +150,8 @@ def _trace_context_from_labels(
carrier = {TRACEPARENT_KEY: traceparent}
return propagate.extract(carrier)

def _traceparent_from_span(self, span: Span) -> str | None:
@staticmethod
def traceparent_from_span(span: Span) -> str | None:
carrier: dict[str, Any] = {}
propagate.inject(carrier, context=trace.set_span_in_context(span))
return carrier.get(TRACEPARENT_KEY)
Expand Down
54 changes: 54 additions & 0 deletions tests/deployment/test_flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pytest
import respx
from httpx import Response
from opentelemetry import trace

from prefect import flow
from prefect.client.schemas.responses import DeploymentResponse
Expand All @@ -18,6 +19,9 @@
PREFECT_API_URL,
)
from prefect.tasks import task
from prefect.telemetry.run_telemetry import (
LABELS_TRACEPARENT_KEY,
)
from prefect.utilities.slugify import slugify
from tests.telemetry.instrumentation_tester import InstrumentationTester

Expand Down Expand Up @@ -488,3 +492,53 @@ async def parent_flow():

assert child_span.parent.span_id == parent_span.get_span_context().span_id
assert child_span.parent.trace_id == parent_span.get_span_context().trace_id

async def test_propagates_otel_trace_from_app_to_deployment_flow_run(
self,
test_deployment: DeploymentResponse,
instrumentation: InstrumentationTester,
prefect_client: "PrefectClient",
):
"""Test that OTEL trace context gets propagated from external app to deployment flow run"""
deployment = test_deployment

@flow(flow_run_name="foo-flow")
async def foo_flow() -> None:
pass

with trace.get_tracer("prefect-test").start_as_current_span(
name="app-root-span"
):
flow_run = await run_deployment(
f"foo/{deployment.name}",
timeout=0,
poll_interval=0,
client=prefect_client,
)

assert LABELS_TRACEPARENT_KEY in flow_run.labels

# Get all spans
spans = instrumentation.get_finished_spans()
# Find parent flow span
app_root_span = next(span for span in spans if span.name == "app-root-span")

# Reset InstrumentationTester so that the app_root_span is forgotten
instrumentation = InstrumentationTester()

await run_flow_async(foo_flow, flow_run)

# Get all spans
spans = instrumentation.get_finished_spans()

foo_span = next(
span
for span in spans
if span.attributes.get("prefect.run.name") == "foo-flow"
)
assert foo_span
assert app_root_span

assert foo_span.parent
assert foo_span.parent.span_id == app_root_span.get_span_context().span_id
assert foo_span.parent.trace_id == app_root_span.get_span_context().trace_id