diff --git a/scripts/split-tox-gh-actions/split-tox-gh-actions.py b/scripts/split-tox-gh-actions/split-tox-gh-actions.py index 5d5f423857..66a4835064 100755 --- a/scripts/split-tox-gh-actions/split-tox-gh-actions.py +++ b/scripts/split-tox-gh-actions/split-tox-gh-actions.py @@ -74,6 +74,7 @@ "langchain", "openai", "huggingface_hub", + "ray", "rq", ], "Databases": [ diff --git a/sentry_sdk/integrations/ray.py b/sentry_sdk/integrations/ray.py new file mode 100644 index 0000000000..45d66a8c2d --- /dev/null +++ b/sentry_sdk/integrations/ray.py @@ -0,0 +1,85 @@ +from sentry_sdk.integrations import DidNotEnable, Integration + +try: + import ray # type: ignore[import-not-found] +except ImportError: + raise DidNotEnable("Ray not installed.") +import functools + +from sentry_sdk.tracing import TRANSACTION_SOURCE_TASK +import logging +import sentry_sdk +from importlib.metadata import version + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from collections.abc import Callable + from typing import Any, Optional + + +def _check_sentry_initialized(): + # type: () -> None + if sentry_sdk.Hub.current.client: + return + # we cannot use sentry sdk logging facilities because it wasn't initialized + logger = logging.getLogger("sentry_sdk.errors") + logger.warning( + "[Tracing] Sentry not initialized in ray cluster worker, performance data will be discarded." + ) + + +def _patch_ray_remote(): + # type: () -> None + old_remote = ray.remote + + @functools.wraps(old_remote) + def new_remote(f, *args, **kwargs): + # type: (Callable[..., Any], *Any, **Any) -> Callable[..., Any] + def _f(*f_args, _tracing=None, **f_kwargs): + # type: (Any, Optional[dict[str, Any]], Any) -> Any + _check_sentry_initialized() + transaction = None + if _tracing is not None: + transaction = sentry_sdk.continue_trace( + _tracing, + op="ray.remote.receive", + source=TRANSACTION_SOURCE_TASK, + name="Ray worker transaction", + ) + with sentry_sdk.start_transaction(transaction) as tx: + result = f(*f_args, **f_kwargs) + tx.set_status("ok") + return result + + rv = old_remote(_f, *args, *kwargs) + old_remote_method = rv.remote + + def _remote_method_with_header_propagation(*args, **kwargs): + # type: (*Any, **Any) -> Any + with sentry_sdk.start_span( + op="ray.remote.send", description="Sending task to ray cluster." + ): + tracing = { + k: v + for k, v in sentry_sdk.Hub.current.iter_trace_propagation_headers() + } + return old_remote_method(*args, **kwargs, _tracing=tracing) + + rv.remote = _remote_method_with_header_propagation + + return rv + + ray.remote = new_remote + return + + +class RayIntegration(Integration): + identifier = "ray" + + @staticmethod + def setup_once(): + # type: () -> None + if tuple(int(x) for x in version("ray").split(".")) < (2, 7, 0): + raise DidNotEnable("Ray 2.7.0 or newer required") + _patch_ray_remote() diff --git a/tests/integrations/ray/__init__.py b/tests/integrations/ray/__init__.py new file mode 100644 index 0000000000..92f6d93906 --- /dev/null +++ b/tests/integrations/ray/__init__.py @@ -0,0 +1,3 @@ +import pytest + +pytest.importorskip("ray") diff --git a/tests/integrations/ray/test_ray.py b/tests/integrations/ray/test_ray.py new file mode 100644 index 0000000000..97c0858b54 --- /dev/null +++ b/tests/integrations/ray/test_ray.py @@ -0,0 +1,57 @@ +import time + +import ray + +import sentry_sdk +from sentry_sdk.envelope import Envelope +from sentry_sdk.integrations.ray import RayIntegration +from tests.conftest import TestTransport + + +class RayTestTransport(TestTransport): + def __init__(self): + self.events = [] + self.envelopes = [] + super().__init__(self.events.append, self.envelopes.append) + + +def _setup_ray_sentry(): + sentry_sdk.init( + traces_sample_rate=1.0, + integrations=[RayIntegration()], + transport=RayTestTransport(), + ) + + +def test_ray(): + _setup_ray_sentry() + + @ray.remote + def _task(): + with sentry_sdk.start_span(op="task", description="example task step"): + time.sleep(0.1) + return sentry_sdk.Hub.current.client.transport.envelopes + + ray.init( + runtime_env=dict(worker_process_setup_hook=_setup_ray_sentry, working_dir="./") + ) + + with sentry_sdk.start_transaction(op="task", name="ray test transaction"): + worker_envelopes = ray.get(_task.remote()) + + _assert_envelopes_are_associated_with_same_trace_id( + sentry_sdk.Hub.current.client.transport.envelopes[0], worker_envelopes[0] + ) + + +def _assert_envelopes_are_associated_with_same_trace_id( + client_side_envelope: Envelope, worker_envelope: Envelope +): + client_side_envelope_dict = client_side_envelope.get_transaction_event() + worker_envelope_dict = worker_envelope.get_transaction_event() + trace_id = client_side_envelope_dict["contexts"]["trace"]["trace_id"] + for span in client_side_envelope_dict["spans"]: + assert span["trace_id"] == trace_id + for span in worker_envelope_dict["spans"]: + assert span["trace_id"] == trace_id + assert worker_envelope_dict["contexts"]["trace"]["trace_id"] == trace_id diff --git a/tox.ini b/tox.ini index f1bc0e7a5e..1697f50ea7 100644 --- a/tox.ini +++ b/tox.ini @@ -185,6 +185,9 @@ envlist = {py3.8,py3.11,py3.12}-quart-v{0.19} {py3.8,py3.11,py3.12}-quart-latest + # Ray + {py3.10,py3.11}-ray + # Redis {py3.6,py3.8}-redis-v{3} {py3.7,py3.8,py3.11}-redis-v{4} @@ -494,6 +497,9 @@ deps = pyramid-v2.0: pyramid~=2.0.0 pyramid-latest: pyramid + # Ray + ray: ray>=2.7.0 + # Quart quart: quart-auth quart: pytest-asyncio @@ -638,6 +644,7 @@ setenv = pymongo: TESTPATH=tests/integrations/pymongo pyramid: TESTPATH=tests/integrations/pyramid quart: TESTPATH=tests/integrations/quart + ray: TESTPATH=tests/integrations/ray redis: TESTPATH=tests/integrations/redis rediscluster: TESTPATH=tests/integrations/rediscluster requests: TESTPATH=tests/integrations/requests