diff --git a/ddtrace/contrib/internal/celery/app.py b/ddtrace/contrib/internal/celery/app.py index 4e305eebec3..e222abd5c17 100644 --- a/ddtrace/contrib/internal/celery/app.py +++ b/ddtrace/contrib/internal/celery/app.py @@ -1,3 +1,5 @@ +import sys + import celery from celery import signals @@ -15,9 +17,14 @@ from ddtrace.contrib.internal.celery.signals import trace_retry from ddtrace.ext import SpanKind from ddtrace.ext import SpanTypes +from ddtrace.internal import core +from ddtrace.internal.logger import get_logger from ddtrace.pin import _DD_PIN_NAME +log = get_logger(__name__) + + def patch_app(app, pin=None): """Attach the Pin class to the application and connect our handlers to Celery signals. @@ -41,6 +48,9 @@ def patch_app(app, pin=None): trace_utils.wrap("celery.beat", "Scheduler.tick", _traced_beat_function(config.celery, "tick")) pin.onto(celery.beat.Scheduler) + # Patch apply_async + trace_utils.wrap("celery.app.task", "Task.apply_async", _traced_apply_async_function(config.celery, "apply_async")) + # connect to the Signal framework signals.task_prerun.connect(trace_prerun, weak=False) signals.task_postrun.connect(trace_postrun, weak=False) @@ -65,6 +75,7 @@ def unpatch_app(app): trace_utils.unwrap(celery.beat.Scheduler, "apply_entry") trace_utils.unwrap(celery.beat.Scheduler, "tick") + trace_utils.unwrap(celery.app.task.Task, "apply_async") signals.task_prerun.disconnect(trace_prerun) signals.task_postrun.disconnect(trace_postrun) @@ -96,3 +107,51 @@ def _traced_beat_inner(func, instance, args, kwargs): return func(*args, **kwargs) return _traced_beat_inner + + +def _traced_apply_async_function(integration_config, fn_name, resource_fn=None): + """ + When apply_async is called, it calls various Celery signals in order, which gets used + to start and close the span. + Example: before_task_publish starts the span while after_task_publish closes the span. + If an exception occurs anywhere inside Celery or its dependencies, this can interrupt the + closing signals. + The purpose of _traced_apply_async_function is to close the spans even if one of the closing + signals don't get called over the course of the apply_task lifecycle. + This is done by fetching the stored span and closing it if it hasn't already been closed by a + closing signal. + """ + + def _traced_apply_async_inner(func, instance, args, kwargs): + with core.context_with_data("task_context"): + try: + return func(*args, **kwargs) + except Exception: + # If an internal exception occurs, record the exception in the span, + # then raise the Celery error as usual + task_span = core.get_item("task_span") + if task_span: + task_span.set_exc_info(*sys.exc_info()) + + prerun_span = core.get_item("prerun_span") + if prerun_span: + prerun_span.set_exc_info(*sys.exc_info()) + + raise + finally: + task_span = core.get_item("task_span") + if task_span: + log.debug( + "The after_task_publish signal was not called, so manually closing span: %s", + task_span._pprint(), + ) + task_span.finish() + + prerun_span = core.get_item("prerun_span") + if prerun_span: + log.debug( + "The task_postrun signal was not called, so manually closing span: %s", prerun_span._pprint() + ) + prerun_span.finish() + + return _traced_apply_async_inner diff --git a/ddtrace/contrib/internal/celery/signals.py b/ddtrace/contrib/internal/celery/signals.py index 1f8dc12dce5..308724c6dae 100644 --- a/ddtrace/contrib/internal/celery/signals.py +++ b/ddtrace/contrib/internal/celery/signals.py @@ -16,6 +16,7 @@ from ddtrace.ext import SpanKind from ddtrace.ext import SpanTypes from ddtrace.ext import net +from ddtrace.internal import core from ddtrace.internal.constants import COMPONENT from ddtrace.internal.logger import get_logger from ddtrace.propagation.http import HTTPPropagator @@ -48,6 +49,9 @@ def trace_prerun(*args, **kwargs): service = config.celery["worker_service_name"] span = pin.tracer.trace(c.WORKER_ROOT_SPAN, service=service, resource=task.name, span_type=SpanTypes.WORKER) + # Store an item called "prerun span" in case task_postrun doesn't get called + core.set_item("prerun_span", span) + # set span.kind to the type of request being performed span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER) @@ -111,6 +115,9 @@ def trace_before_publish(*args, **kwargs): service = config.celery["producer_service_name"] span = pin.tracer.trace(c.PRODUCER_ROOT_SPAN, service=service, resource=task_name) + # Store an item called "task span" in case after_task_publish doesn't get called + core.set_item("task_span", span) + span.set_tag_str(COMPONENT, config.celery.integration_name) # set span.kind to the type of request being performed diff --git a/releasenotes/notes/fix-celery-apply-async-span-close-b7a8db188459f5b5.yaml b/releasenotes/notes/fix-celery-apply-async-span-close-b7a8db188459f5b5.yaml new file mode 100644 index 00000000000..4ca112a2cfb --- /dev/null +++ b/releasenotes/notes/fix-celery-apply-async-span-close-b7a8db188459f5b5.yaml @@ -0,0 +1,4 @@ +--- +fixes: + - | + tracing(celery): Fixes an issue where ``celery.apply`` spans didn't close if the ``after_task_publish`` or ``task_postrun`` signals didn't get sent when using ``apply_async``, which can happen if there is an internal exception during the handling of the task. This update also marks the span as an error if an exception occurs. \ No newline at end of file diff --git a/tests/contrib/celery/test_integration.py b/tests/contrib/celery/test_integration.py index 267789d325a..3caace9e269 100644 --- a/tests/contrib/celery/test_integration.py +++ b/tests/contrib/celery/test_integration.py @@ -6,6 +6,7 @@ import celery from celery.exceptions import Retry +import mock import pytest from ddtrace import Pin @@ -442,6 +443,34 @@ def run(self): assert span.get_tag("span.kind") == "consumer" assert span.error == 0 + @mock.patch("kombu.messaging.Producer.publish", mock.Mock(side_effect=ValueError)) + def test_fn_task_apply_async_soft_exception(self): + # If the underlying library runs into an exception that doesn't crash the app + # while calling apply_async, we should still close the span even + # if the closing signals didn't get called and mark the span as an error + + @self.app.task + def fn_task_parameters(user, force_logout=False): + return (user, force_logout) + + t = None + try: + t = fn_task_parameters.apply_async(args=["user"], kwargs={"force_logout": True}) + except ValueError: + traces = self.pop_traces() + assert 1 == len(traces) + assert traces[0][0].name == "celery.apply" + assert traces[0][0].resource == "tests.contrib.celery.test_integration.fn_task_parameters" + assert traces[0][0].get_tag("celery.action") == "apply_async" + assert traces[0][0].get_tag("component") == "celery" + assert traces[0][0].get_tag("span.kind") == "producer" + # Internal library errors get recorded on the span + assert traces[0][0].error == 1 + assert traces[0][0].get_tag("error.type") == "builtins.ValueError" + assert "ValueError" in traces[0][0].get_tag("error.stack") + # apply_async runs into an internal error (ValueError) so nothing is returned to t + assert t is None + def test_shared_task(self): # Ensure Django Shared Task are supported @celery.shared_task