-
Notifications
You must be signed in to change notification settings - Fork 517
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Dramatiq integration from @jacobsvante (#3397)
This is the code from [sentry-dramatiq](https://github.com/jacobsvante/sentry-dramatiq). As described in this GitHub issue (#3387) @jacobsvante, the original maintainer of this integration, is not doing any Python anymore and wants to donate his integration to Sentry so we can take care of it. This PR adds the current version of the `DramatiqIntegration` to our repo. (The original integrations has been ported to the new SDK 2.x API) Fixes #3387 --------- Co-authored-by: Ivana Kellyer <ivana.kellyer@sentry.io>
- Loading branch information
1 parent
7d46709
commit da0392f
Showing
6 changed files
with
423 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -80,6 +80,7 @@ | |
"arq", | ||
"beam", | ||
"celery", | ||
"dramatiq", | ||
"huey", | ||
"rq", | ||
"spark", | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,167 @@ | ||
import json | ||
|
||
import sentry_sdk | ||
from sentry_sdk.integrations import Integration | ||
from sentry_sdk._types import TYPE_CHECKING | ||
from sentry_sdk.integrations._wsgi_common import request_body_within_bounds | ||
from sentry_sdk.utils import ( | ||
AnnotatedValue, | ||
capture_internal_exceptions, | ||
event_from_exception, | ||
) | ||
|
||
from dramatiq.broker import Broker # type: ignore | ||
from dramatiq.message import Message # type: ignore | ||
from dramatiq.middleware import Middleware, default_middleware # type: ignore | ||
from dramatiq.errors import Retry # type: ignore | ||
|
||
if TYPE_CHECKING: | ||
from typing import Any, Callable, Dict, Optional, Union | ||
from sentry_sdk._types import Event, Hint | ||
|
||
|
||
class DramatiqIntegration(Integration): | ||
""" | ||
Dramatiq integration for Sentry | ||
Please make sure that you call `sentry_sdk.init` *before* initializing | ||
your broker, as it monkey patches `Broker.__init__`. | ||
This integration was originally developed and maintained | ||
by https://github.com/jacobsvante and later donated to the Sentry | ||
project. | ||
""" | ||
|
||
identifier = "dramatiq" | ||
|
||
@staticmethod | ||
def setup_once(): | ||
# type: () -> None | ||
_patch_dramatiq_broker() | ||
|
||
|
||
def _patch_dramatiq_broker(): | ||
# type: () -> None | ||
original_broker__init__ = Broker.__init__ | ||
|
||
def sentry_patched_broker__init__(self, *args, **kw): | ||
# type: (Broker, *Any, **Any) -> None | ||
integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) | ||
|
||
try: | ||
middleware = kw.pop("middleware") | ||
except KeyError: | ||
# Unfortunately Broker and StubBroker allows middleware to be | ||
# passed in as positional arguments, whilst RabbitmqBroker and | ||
# RedisBroker does not. | ||
if len(args) == 1: | ||
middleware = args[0] | ||
args = [] # type: ignore | ||
else: | ||
middleware = None | ||
|
||
if middleware is None: | ||
middleware = list(m() for m in default_middleware) | ||
else: | ||
middleware = list(middleware) | ||
|
||
if integration is not None: | ||
middleware = [m for m in middleware if not isinstance(m, SentryMiddleware)] | ||
middleware.insert(0, SentryMiddleware()) | ||
|
||
kw["middleware"] = middleware | ||
original_broker__init__(self, *args, **kw) | ||
|
||
Broker.__init__ = sentry_patched_broker__init__ | ||
|
||
|
||
class SentryMiddleware(Middleware): # type: ignore[misc] | ||
""" | ||
A Dramatiq middleware that automatically captures and sends | ||
exceptions to Sentry. | ||
This is automatically added to every instantiated broker via the | ||
DramatiqIntegration. | ||
""" | ||
|
||
def before_process_message(self, broker, message): | ||
# type: (Broker, Message) -> None | ||
integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) | ||
if integration is None: | ||
return | ||
|
||
message._scope_manager = sentry_sdk.new_scope() | ||
message._scope_manager.__enter__() | ||
|
||
scope = sentry_sdk.get_current_scope() | ||
scope.transaction = message.actor_name | ||
scope.set_extra("dramatiq_message_id", message.message_id) | ||
scope.add_event_processor(_make_message_event_processor(message, integration)) | ||
|
||
def after_process_message(self, broker, message, *, result=None, exception=None): | ||
# type: (Broker, Message, Any, Optional[Any], Optional[Exception]) -> None | ||
integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) | ||
if integration is None: | ||
return | ||
|
||
actor = broker.get_actor(message.actor_name) | ||
throws = message.options.get("throws") or actor.options.get("throws") | ||
|
||
try: | ||
if ( | ||
exception is not None | ||
and not (throws and isinstance(exception, throws)) | ||
and not isinstance(exception, Retry) | ||
): | ||
event, hint = event_from_exception( | ||
exception, | ||
client_options=sentry_sdk.get_client().options, | ||
mechanism={ | ||
"type": DramatiqIntegration.identifier, | ||
"handled": False, | ||
}, | ||
) | ||
sentry_sdk.capture_event(event, hint=hint) | ||
finally: | ||
message._scope_manager.__exit__(None, None, None) | ||
|
||
|
||
def _make_message_event_processor(message, integration): | ||
# type: (Message, DramatiqIntegration) -> Callable[[Event, Hint], Optional[Event]] | ||
|
||
def inner(event, hint): | ||
# type: (Event, Hint) -> Optional[Event] | ||
with capture_internal_exceptions(): | ||
DramatiqMessageExtractor(message).extract_into_event(event) | ||
|
||
return event | ||
|
||
return inner | ||
|
||
|
||
class DramatiqMessageExtractor(object): | ||
def __init__(self, message): | ||
# type: (Message) -> None | ||
self.message_data = dict(message.asdict()) | ||
|
||
def content_length(self): | ||
# type: () -> int | ||
return len(json.dumps(self.message_data)) | ||
|
||
def extract_into_event(self, event): | ||
# type: (Event) -> None | ||
client = sentry_sdk.get_client() | ||
if not client.is_active(): | ||
return | ||
|
||
contexts = event.setdefault("contexts", {}) | ||
request_info = contexts.setdefault("dramatiq", {}) | ||
request_info["type"] = "dramatiq" | ||
|
||
data = None # type: Optional[Union[AnnotatedValue, Dict[str, Any]]] | ||
if not request_body_within_bounds(client, self.content_length()): | ||
data = AnnotatedValue.removed_because_over_size_limit() | ||
else: | ||
data = self.message_data | ||
|
||
request_info["data"] = data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
import pytest | ||
|
||
pytest.importorskip("dramatiq") |
Oops, something went wrong.