Skip to content

Commit

Permalink
Unwrap Celery's ExceptionInfo (#1863)
Browse files Browse the repository at this point in the history
* Unwrap `ExceptionInfo` and `ExceptionWithTraceback`

Instead of reporting the `ExceptionInfo` and `ExceptionWithTraceback`
wrappers raised by Celery, report the exceptions that they wrap.

This ensures that the exception in the OpenTelemetry span has a type
and traceback that are meaningful and relevant to the developer.

* Fix typo

The exception is expected, not excepted. Well, I guess it is also
excepted, because it's an exception, but you get what I mean.

* Reformat file with `black`

Reformat the `__init__.py` file in the Celery instrumentation using
`black`, fixing a CI linter error.

* Address review feedback

Use the VERSION attribute exposed by Billiard to decide whether to
import ExceptionWithTraceback.

Add a test for a failing task and check that the exceptions' type
and message are preserved.

* Amend ExceptionWithTraceback import
  • Loading branch information
unflxw authored Sep 3, 2023
1 parent a02d98c commit 6f3aead
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1889](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1889))
- Fixed union typing error not compatible with Python 3.7 introduced in `opentelemetry-util-http`, fix tests introduced by patch related to sanitize method for wsgi
([#1913](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1913))
- `opentelemetry-instrumentation-celery` Unwrap Celery's `ExceptionInfo` errors and report the actual exception that was raised. ([#1863](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1863))

### Added

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def add(x, y):
from timeit import default_timer
from typing import Collection, Iterable

from billiard.einfo import ExceptionInfo
from celery import signals # pylint: disable=no-name-in-module

from opentelemetry import trace
Expand All @@ -75,6 +76,13 @@ def add(x, y):
from opentelemetry.propagators.textmap import Getter
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace.status import Status, StatusCode
from billiard import VERSION


if VERSION >= (4, 0, 1):
from billiard.einfo import ExceptionWithTraceback
else:
ExceptionWithTraceback = None

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -271,6 +279,18 @@ def _trace_failure(*args, **kwargs):
return

if ex is not None:
# Unwrap the actual exception wrapped by billiard's
# `ExceptionInfo` and `ExceptionWithTraceback`.
if isinstance(ex, ExceptionInfo) and ex.exception is not None:
ex = ex.exception

if (
ExceptionWithTraceback is not None
and isinstance(ex, ExceptionWithTraceback)
and ex.exc is not None
):
ex = ex.exc

status_kwargs["description"] = str(ex)
span.record_exception(ex)
span.set_status(Status(**status_kwargs))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import logging

from celery import registry # pylint: disable=no-name-in-module
from billiard import VERSION

from opentelemetry.semconv.trace import SpanAttributes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ class Config:
app.config_from_object(Config)


class CustomError(Exception):
pass


@app.task
def task_add(num_a, num_b):
return num_a + num_b


@app.task
def task_raises():
raise CustomError("The task failed!")
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace import SpanKind
from opentelemetry.trace import SpanKind, StatusCode

from .celery_test_tasks import app, task_add
from .celery_test_tasks import app, task_add, task_raises


class TestCeleryInstrumentation(TestBase):
Expand Down Expand Up @@ -66,6 +66,10 @@ def test_task(self):
},
)

self.assertEqual(consumer.status.status_code, StatusCode.UNSET)

self.assertEqual(0, len(consumer.events))

self.assertEqual(
producer.name, "apply_async/tests.celery_test_tasks.task_add"
)
Expand All @@ -84,6 +88,70 @@ def test_task(self):
self.assertEqual(consumer.parent.span_id, producer.context.span_id)
self.assertEqual(consumer.context.trace_id, producer.context.trace_id)

def test_task_raises(self):
CeleryInstrumentor().instrument()

result = task_raises.delay()

timeout = time.time() + 60 * 1 # 1 minutes from now
while not result.ready():
if time.time() > timeout:
break
time.sleep(0.05)

spans = self.sorted_spans(self.memory_exporter.get_finished_spans())
self.assertEqual(len(spans), 2)

consumer, producer = spans

self.assertEqual(
consumer.name, "run/tests.celery_test_tasks.task_raises"
)
self.assertEqual(consumer.kind, SpanKind.CONSUMER)
self.assertSpanHasAttributes(
consumer,
{
"celery.action": "run",
"celery.state": "FAILURE",
SpanAttributes.MESSAGING_DESTINATION: "celery",
"celery.task_name": "tests.celery_test_tasks.task_raises",
},
)

self.assertEqual(consumer.status.status_code, StatusCode.ERROR)

self.assertEqual(1, len(consumer.events))
event = consumer.events[0]

self.assertIn(SpanAttributes.EXCEPTION_STACKTRACE, event.attributes)

self.assertEqual(
event.attributes[SpanAttributes.EXCEPTION_TYPE], "CustomError"
)

self.assertEqual(
event.attributes[SpanAttributes.EXCEPTION_MESSAGE],
"The task failed!",
)

self.assertEqual(
producer.name, "apply_async/tests.celery_test_tasks.task_raises"
)
self.assertEqual(producer.kind, SpanKind.PRODUCER)
self.assertSpanHasAttributes(
producer,
{
"celery.action": "apply_async",
"celery.task_name": "tests.celery_test_tasks.task_raises",
SpanAttributes.MESSAGING_DESTINATION_KIND: "queue",
SpanAttributes.MESSAGING_DESTINATION: "celery",
},
)

self.assertNotEqual(consumer.parent, producer.context)
self.assertEqual(consumer.parent.span_id, producer.context.span_id)
self.assertEqual(consumer.context.trace_id, producer.context.trace_id)

def test_uninstrument(self):
CeleryInstrumentor().instrument()
CeleryInstrumentor().uninstrument()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ def fn_exception():
assert len(span.events) == 1
event = span.events[0]
assert event.name == "exception"
assert event.attributes[SpanAttributes.EXCEPTION_TYPE] == "ExceptionInfo"
assert event.attributes[SpanAttributes.EXCEPTION_TYPE] == "Exception"
assert SpanAttributes.EXCEPTION_MESSAGE in event.attributes
assert (
span.attributes.get(SpanAttributes.MESSAGING_MESSAGE_ID)
Expand Down Expand Up @@ -420,7 +420,7 @@ def run(self):
assert "Task class is failing" in span.status.description


def test_class_task_exception_excepted(celery_app, memory_exporter):
def test_class_task_exception_expected(celery_app, memory_exporter):
class BaseTask(celery_app.Task):
throws = (MyException,)

Expand Down

0 comments on commit 6f3aead

Please sign in to comment.