From c59e6a0009d541eb95c8453c19df469d65c8a6c3 Mon Sep 17 00:00:00 2001 From: Fatih Acar Date: Wed, 22 Jan 2025 15:54:58 +0100 Subject: [PATCH] feat: propagate trace context from user app We may want to propagate the traceparent from an instrumented app running Prefect deployments so that we get end-to-end tracing from the app to the Prefect workers. Signed-off-by: Fatih Acar --- src/prefect/deployments/flow_runs.py | 8 ++-- src/prefect/telemetry/run_telemetry.py | 5 ++- tests/deployment/test_flow_runs.py | 54 ++++++++++++++++++++++++++ 3 files changed, 62 insertions(+), 5 deletions(-) diff --git a/src/prefect/deployments/flow_runs.py b/src/prefect/deployments/flow_runs.py index 1927933ea040..67b5e3b23abb 100644 --- a/src/prefect/deployments/flow_runs.py +++ b/src/prefect/deployments/flow_runs.py @@ -4,6 +4,8 @@ import anyio import pendulum +from opentelemetry import trace +from opentelemetry.instrumentation.utils import is_instrumentation_enabled import prefect from prefect._result_records import ResultRecordMetadata @@ -13,9 +15,7 @@ from prefect.logging import get_logger from prefect.states import Pending, Scheduled from prefect.tasks import Task -from prefect.telemetry.run_telemetry import ( - LABELS_TRACEPARENT_KEY, -) +from prefect.telemetry.run_telemetry import LABELS_TRACEPARENT_KEY, RunTelemetry from prefect.utilities.asyncutils import sync_compatible from prefect.utilities.slugify import slugify @@ -164,6 +164,8 @@ async def run_deployment( if flow_run_ctx and flow_run_ctx.flow_run: traceparent = flow_run_ctx.flow_run.labels.get(LABELS_TRACEPARENT_KEY) + elif is_instrumentation_enabled(): + traceparent = RunTelemetry.traceparent_from_span(span=trace.get_current_span()) else: traceparent = None diff --git a/src/prefect/telemetry/run_telemetry.py b/src/prefect/telemetry/run_telemetry.py index abad58f781fe..df466535f358 100644 --- a/src/prefect/telemetry/run_telemetry.py +++ b/src/prefect/telemetry/run_telemetry.py @@ -132,7 +132,7 @@ def _start_span( }, ) - if traceparent := self._traceparent_from_span(self.span): + if traceparent := RunTelemetry.traceparent_from_span(self.span): run.labels[LABELS_TRACEPARENT_KEY] = traceparent return traceparent, self.span @@ -150,7 +150,8 @@ def _trace_context_from_labels( carrier = {TRACEPARENT_KEY: traceparent} return propagate.extract(carrier) - def _traceparent_from_span(self, span: Span) -> str | None: + @staticmethod + def traceparent_from_span(span: Span) -> str | None: carrier: dict[str, Any] = {} propagate.inject(carrier, context=trace.set_span_in_context(span)) return carrier.get(TRACEPARENT_KEY) diff --git a/tests/deployment/test_flow_runs.py b/tests/deployment/test_flow_runs.py index d599e4682d5c..b059b6d2ab65 100644 --- a/tests/deployment/test_flow_runs.py +++ b/tests/deployment/test_flow_runs.py @@ -7,6 +7,7 @@ import pytest import respx from httpx import Response +from opentelemetry import trace from prefect import flow from prefect.client.schemas.responses import DeploymentResponse @@ -18,6 +19,9 @@ PREFECT_API_URL, ) from prefect.tasks import task +from prefect.telemetry.run_telemetry import ( + LABELS_TRACEPARENT_KEY, +) from prefect.utilities.slugify import slugify from tests.telemetry.instrumentation_tester import InstrumentationTester @@ -488,3 +492,53 @@ async def parent_flow(): assert child_span.parent.span_id == parent_span.get_span_context().span_id assert child_span.parent.trace_id == parent_span.get_span_context().trace_id + + async def test_propagates_otel_trace_from_app_to_deployment_flow_run( + self, + test_deployment: DeploymentResponse, + instrumentation: InstrumentationTester, + prefect_client: "PrefectClient", + ): + """Test that OTEL trace context gets propagated from external app to deployment flow run""" + deployment = test_deployment + + @flow(flow_run_name="foo-flow") + async def foo_flow() -> None: + pass + + with trace.get_tracer("prefect-test").start_as_current_span( + name="app-root-span" + ): + flow_run = await run_deployment( + f"foo/{deployment.name}", + timeout=0, + poll_interval=0, + client=prefect_client, + ) + + assert LABELS_TRACEPARENT_KEY in flow_run.labels + + # Get all spans + spans = instrumentation.get_finished_spans() + # Find parent flow span + app_root_span = next(span for span in spans if span.name == "app-root-span") + + # Reset InstrumentationTester so that the app_root_span is forgotten + instrumentation = InstrumentationTester() + + await run_flow_async(foo_flow, flow_run) + + # Get all spans + spans = instrumentation.get_finished_spans() + + foo_span = next( + span + for span in spans + if span.attributes.get("prefect.run.name") == "foo-flow" + ) + assert foo_span + assert app_root_span + + assert foo_span.parent + assert foo_span.parent.span_id == app_root_span.get_span_context().span_id + assert foo_span.parent.trace_id == app_root_span.get_span_context().trace_id