Skip to content

Commit

Permalink
feat(ray): Create Ray integration
Browse files Browse the repository at this point in the history
The integration includes performance support. Also, add tests for the integration.

Closes getsentry#2400
  • Loading branch information
glowskir authored and szokeasaurusrex committed May 6, 2024
1 parent b24c1e4 commit 44e3d24
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 0 deletions.
1 change: 1 addition & 0 deletions scripts/split-tox-gh-actions/split-tox-gh-actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
"langchain",
"openai",
"huggingface_hub",
"ray",
"rq",
],
"Databases": [
Expand Down
85 changes: 85 additions & 0 deletions sentry_sdk/integrations/ray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from sentry_sdk.integrations import DidNotEnable, Integration

try:
import ray # type: ignore[import-not-found]
except ImportError:
raise DidNotEnable("Ray not installed.")
import functools

from sentry_sdk.tracing import TRANSACTION_SOURCE_TASK
import logging
import sentry_sdk
from importlib.metadata import version

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any, Optional


def _check_sentry_initialized():
# type: () -> None
if sentry_sdk.Hub.current.client:
return
# we cannot use sentry sdk logging facilities because it wasn't initialized
logger = logging.getLogger("sentry_sdk.errors")
logger.warning(
"[Tracing] Sentry not initialized in ray cluster worker, performance data will be discarded."
)


def _patch_ray_remote():
# type: () -> None
old_remote = ray.remote

@functools.wraps(old_remote)
def new_remote(f, *args, **kwargs):
# type: (Callable[..., Any], *Any, **Any) -> Callable[..., Any]
def _f(*f_args, _tracing=None, **f_kwargs):
# type: (Any, Optional[dict[str, Any]], Any) -> Any
_check_sentry_initialized()
transaction = None
if _tracing is not None:
transaction = sentry_sdk.continue_trace(
_tracing,
op="ray.remote.receive",
source=TRANSACTION_SOURCE_TASK,
name="Ray worker transaction",
)
with sentry_sdk.start_transaction(transaction) as tx:
result = f(*f_args, **f_kwargs)
tx.set_status("ok")
return result

rv = old_remote(_f, *args, *kwargs)
old_remote_method = rv.remote

def _remote_method_with_header_propagation(*args, **kwargs):
# type: (*Any, **Any) -> Any
with sentry_sdk.start_span(
op="ray.remote.send", description="Sending task to ray cluster."
):
tracing = {
k: v
for k, v in sentry_sdk.Hub.current.iter_trace_propagation_headers()
}
return old_remote_method(*args, **kwargs, _tracing=tracing)

rv.remote = _remote_method_with_header_propagation

return rv

ray.remote = new_remote
return


class RayIntegration(Integration):
identifier = "ray"

@staticmethod
def setup_once():
# type: () -> None
if tuple(int(x) for x in version("ray").split(".")) < (2, 7, 0):
raise DidNotEnable("Ray 2.7.0 or newer required")
_patch_ray_remote()
3 changes: 3 additions & 0 deletions tests/integrations/ray/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import pytest

pytest.importorskip("ray")
57 changes: 57 additions & 0 deletions tests/integrations/ray/test_ray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import time

import ray

import sentry_sdk
from sentry_sdk.envelope import Envelope
from sentry_sdk.integrations.ray import RayIntegration
from tests.conftest import TestTransport


class RayTestTransport(TestTransport):
def __init__(self):
self.events = []
self.envelopes = []
super().__init__(self.events.append, self.envelopes.append)


def _setup_ray_sentry():
sentry_sdk.init(
traces_sample_rate=1.0,
integrations=[RayIntegration()],
transport=RayTestTransport(),
)


def test_ray():
_setup_ray_sentry()

@ray.remote
def _task():
with sentry_sdk.start_span(op="task", description="example task step"):
time.sleep(0.1)
return sentry_sdk.Hub.current.client.transport.envelopes

ray.init(
runtime_env=dict(worker_process_setup_hook=_setup_ray_sentry, working_dir="./")
)

with sentry_sdk.start_transaction(op="task", name="ray test transaction"):
worker_envelopes = ray.get(_task.remote())

_assert_envelopes_are_associated_with_same_trace_id(
sentry_sdk.Hub.current.client.transport.envelopes[0], worker_envelopes[0]
)


def _assert_envelopes_are_associated_with_same_trace_id(
client_side_envelope: Envelope, worker_envelope: Envelope
):
client_side_envelope_dict = client_side_envelope.get_transaction_event()
worker_envelope_dict = worker_envelope.get_transaction_event()
trace_id = client_side_envelope_dict["contexts"]["trace"]["trace_id"]
for span in client_side_envelope_dict["spans"]:
assert span["trace_id"] == trace_id
for span in worker_envelope_dict["spans"]:
assert span["trace_id"] == trace_id
assert worker_envelope_dict["contexts"]["trace"]["trace_id"] == trace_id
7 changes: 7 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ envlist =
{py3.8,py3.11,py3.12}-quart-v{0.19}
{py3.8,py3.11,py3.12}-quart-latest

# Ray
{py3.10,py3.11}-ray

# Redis
{py3.6,py3.8}-redis-v{3}
{py3.7,py3.8,py3.11}-redis-v{4}
Expand Down Expand Up @@ -494,6 +497,9 @@ deps =
pyramid-v2.0: pyramid~=2.0.0
pyramid-latest: pyramid

# Ray
ray: ray>=2.7.0

# Quart
quart: quart-auth
quart: pytest-asyncio
Expand Down Expand Up @@ -638,6 +644,7 @@ setenv =
pymongo: TESTPATH=tests/integrations/pymongo
pyramid: TESTPATH=tests/integrations/pyramid
quart: TESTPATH=tests/integrations/quart
ray: TESTPATH=tests/integrations/ray
redis: TESTPATH=tests/integrations/redis
rediscluster: TESTPATH=tests/integrations/rediscluster
requests: TESTPATH=tests/integrations/requests
Expand Down

0 comments on commit 44e3d24

Please sign in to comment.