Skip to content

Commit

Permalink
feat(integrations): add support for cluster clients from redis sdk (#…
Browse files Browse the repository at this point in the history
…2394)

This change adds support for cluster clients from the redis sdk (as opposed to the rediscluster library).

This has also been tested in my own app which uses clusters (but not asyncio clusters).

Fixes GH-2523

* feat(integrations): add support for cluster clients from redis sdk

* fix: review round 1

* fix: explicit `is not None` checks

* fix: explicit `is not None` checks, take 2

* fix: add try/except to _set_db_data

* fix: handle additional spans and breadcrumbs caused by rediscluster initialization

* fix: typing for redis integration

* fix: simplify assertions

* add `capture_internal_exceptions`

Co-authored-by: Matthieu Devlin <matt@zumper.com>

* rerun CI

---------

Co-authored-by: Daniel Szoke <szokeasaurusrex@users.noreply.github.com>
  • Loading branch information
md384 and szokeasaurusrex authored Dec 7, 2023
1 parent 22bdc4d commit 75f89b8
Show file tree
Hide file tree
Showing 6 changed files with 435 additions and 41 deletions.
151 changes: 126 additions & 25 deletions sentry_sdk/integrations/redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@
)

if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any, Dict, Sequence
from redis import Redis, RedisCluster
from redis.asyncio.cluster import (
RedisCluster as AsyncRedisCluster,
ClusterPipeline as AsyncClusterPipeline,
)
from sentry_sdk.tracing import Span

_SINGLE_KEY_COMMANDS = frozenset(
Expand Down Expand Up @@ -83,8 +89,7 @@ def _set_pipeline_data(
):
# type: (Span, bool, Any, bool, Sequence[Any]) -> None
span.set_tag("redis.is_cluster", is_cluster)
transaction = is_transaction if not is_cluster else False
span.set_tag("redis.transaction", transaction)
span.set_tag("redis.transaction", is_transaction)

commands = []
for i, arg in enumerate(command_stack):
Expand Down Expand Up @@ -118,7 +123,7 @@ def _set_client_data(span, is_cluster, name, *args):
span.set_tag("redis.key", args[0])


def _set_db_data(span, connection_params):
def _set_db_data_on_span(span, connection_params):
# type: (Span, Dict[str, Any]) -> None
span.set_data(SPANDATA.DB_SYSTEM, "redis")

Expand All @@ -135,8 +140,43 @@ def _set_db_data(span, connection_params):
span.set_data(SPANDATA.SERVER_PORT, port)


def patch_redis_pipeline(pipeline_cls, is_cluster, get_command_args_fn):
# type: (Any, bool, Any) -> None
def _set_db_data(span, redis_instance):
# type: (Span, Redis[Any]) -> None
try:
_set_db_data_on_span(span, redis_instance.connection_pool.connection_kwargs)
except AttributeError:
pass # connections_kwargs may be missing in some cases


def _set_cluster_db_data(span, redis_cluster_instance):
# type: (Span, RedisCluster[Any]) -> None
default_node = redis_cluster_instance.get_default_node()
if default_node is not None:
_set_db_data_on_span(
span, {"host": default_node.host, "port": default_node.port}
)


def _set_async_cluster_db_data(span, async_redis_cluster_instance):
# type: (Span, AsyncRedisCluster[Any]) -> None
default_node = async_redis_cluster_instance.get_default_node()
if default_node is not None and default_node.connection_kwargs is not None:
_set_db_data_on_span(span, default_node.connection_kwargs)


def _set_async_cluster_pipeline_db_data(span, async_redis_cluster_pipeline_instance):
# type: (Span, AsyncClusterPipeline[Any]) -> None
with capture_internal_exceptions():
_set_async_cluster_db_data(
span,
# the AsyncClusterPipeline has always had a `_client` attr but it is private so potentially problematic and mypy
# does not recognize it - see https://github.com/redis/redis-py/blame/v5.0.0/redis/asyncio/cluster.py#L1386
async_redis_cluster_pipeline_instance._client, # type: ignore[attr-defined]
)


def patch_redis_pipeline(pipeline_cls, is_cluster, get_command_args_fn, set_db_data_fn):
# type: (Any, bool, Any, Callable[[Span, Any], None]) -> None
old_execute = pipeline_cls.execute

def sentry_patched_execute(self, *args, **kwargs):
Expand All @@ -150,12 +190,12 @@ def sentry_patched_execute(self, *args, **kwargs):
op=OP.DB_REDIS, description="redis.pipeline.execute"
) as span:
with capture_internal_exceptions():
_set_db_data(span, self.connection_pool.connection_kwargs)
set_db_data_fn(span, self)
_set_pipeline_data(
span,
is_cluster,
get_command_args_fn,
self.transaction,
False if is_cluster else self.transaction,
self.command_stack,
)

Expand All @@ -164,8 +204,8 @@ def sentry_patched_execute(self, *args, **kwargs):
pipeline_cls.execute = sentry_patched_execute


def patch_redis_client(cls, is_cluster):
# type: (Any, bool) -> None
def patch_redis_client(cls, is_cluster, set_db_data_fn):
# type: (Any, bool, Callable[[Span, Any], None]) -> None
"""
This function can be used to instrument custom redis client classes or
subclasses.
Expand All @@ -189,11 +229,7 @@ def sentry_patched_execute_command(self, name, *args, **kwargs):
description = description[: integration.max_data_size - len("...")] + "..."

with hub.start_span(op=OP.DB_REDIS, description=description) as span:
try:
_set_db_data(span, self.connection_pool.connection_kwargs)
except AttributeError:
pass # connections_kwargs may be missing in some cases

set_db_data_fn(span, self)
_set_client_data(span, is_cluster, name, *args)

return old_execute_command(self, name, *args, **kwargs)
Expand All @@ -203,14 +239,16 @@ def sentry_patched_execute_command(self, name, *args, **kwargs):

def _patch_redis(StrictRedis, client): # noqa: N803
# type: (Any, Any) -> None
patch_redis_client(StrictRedis, is_cluster=False)
patch_redis_pipeline(client.Pipeline, False, _get_redis_command_args)
patch_redis_client(StrictRedis, is_cluster=False, set_db_data_fn=_set_db_data)
patch_redis_pipeline(client.Pipeline, False, _get_redis_command_args, _set_db_data)
try:
strict_pipeline = client.StrictPipeline
except AttributeError:
pass
else:
patch_redis_pipeline(strict_pipeline, False, _get_redis_command_args)
patch_redis_pipeline(
strict_pipeline, False, _get_redis_command_args, _set_db_data
)

try:
import redis.asyncio
Expand All @@ -222,8 +260,56 @@ def _patch_redis(StrictRedis, client): # noqa: N803
patch_redis_async_pipeline,
)

patch_redis_async_client(redis.asyncio.client.StrictRedis)
patch_redis_async_pipeline(redis.asyncio.client.Pipeline)
patch_redis_async_client(
redis.asyncio.client.StrictRedis,
is_cluster=False,
set_db_data_fn=_set_db_data,
)
patch_redis_async_pipeline(
redis.asyncio.client.Pipeline,
False,
_get_redis_command_args,
set_db_data_fn=_set_db_data,
)


def _patch_redis_cluster():
# type: () -> None
"""Patches the cluster module on redis SDK (as opposed to rediscluster library)"""
try:
from redis import RedisCluster, cluster
except ImportError:
pass
else:
patch_redis_client(RedisCluster, True, _set_cluster_db_data)
patch_redis_pipeline(
cluster.ClusterPipeline,
True,
_parse_rediscluster_command,
_set_cluster_db_data,
)

try:
from redis.asyncio import cluster as async_cluster
except ImportError:
pass
else:
from sentry_sdk.integrations.redis.asyncio import (
patch_redis_async_client,
patch_redis_async_pipeline,
)

patch_redis_async_client(
async_cluster.RedisCluster,
is_cluster=True,
set_db_data_fn=_set_async_cluster_db_data,
)
patch_redis_async_pipeline(
async_cluster.ClusterPipeline,
True,
_parse_rediscluster_command,
set_db_data_fn=_set_async_cluster_pipeline_db_data,
)


def _patch_rb():
Expand All @@ -233,9 +319,15 @@ def _patch_rb():
except ImportError:
pass
else:
patch_redis_client(rb.clients.FanoutClient, is_cluster=False)
patch_redis_client(rb.clients.MappingClient, is_cluster=False)
patch_redis_client(rb.clients.RoutingClient, is_cluster=False)
patch_redis_client(
rb.clients.FanoutClient, is_cluster=False, set_db_data_fn=_set_db_data
)
patch_redis_client(
rb.clients.MappingClient, is_cluster=False, set_db_data_fn=_set_db_data
)
patch_redis_client(
rb.clients.RoutingClient, is_cluster=False, set_db_data_fn=_set_db_data
)


def _patch_rediscluster():
Expand All @@ -245,7 +337,9 @@ def _patch_rediscluster():
except ImportError:
return

patch_redis_client(rediscluster.RedisCluster, is_cluster=True)
patch_redis_client(
rediscluster.RedisCluster, is_cluster=True, set_db_data_fn=_set_db_data
)

# up to v1.3.6, __version__ attribute is a tuple
# from v2.0.0, __version__ is a string and VERSION a tuple
Expand All @@ -255,11 +349,17 @@ def _patch_rediscluster():
# https://github.com/Grokzen/redis-py-cluster/blob/master/docs/release-notes.rst
if (0, 2, 0) < version < (2, 0, 0):
pipeline_cls = rediscluster.pipeline.StrictClusterPipeline
patch_redis_client(rediscluster.StrictRedisCluster, is_cluster=True)
patch_redis_client(
rediscluster.StrictRedisCluster,
is_cluster=True,
set_db_data_fn=_set_db_data,
)
else:
pipeline_cls = rediscluster.pipeline.ClusterPipeline

patch_redis_pipeline(pipeline_cls, True, _parse_rediscluster_command)
patch_redis_pipeline(
pipeline_cls, True, _parse_rediscluster_command, set_db_data_fn=_set_db_data
)


class RedisIntegration(Integration):
Expand All @@ -278,6 +378,7 @@ def setup_once():
raise DidNotEnable("Redis client not installed")

_patch_redis(StrictRedis, client)
_patch_redis_cluster()
_patch_rb()

try:
Expand Down
36 changes: 20 additions & 16 deletions sentry_sdk/integrations/redis/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,25 @@
from sentry_sdk.consts import OP
from sentry_sdk.integrations.redis import (
RedisIntegration,
_get_redis_command_args,
_get_span_description,
_set_client_data,
_set_db_data,
_set_pipeline_data,
)
from sentry_sdk._types import TYPE_CHECKING
from sentry_sdk.tracing import Span
from sentry_sdk.utils import capture_internal_exceptions

if TYPE_CHECKING:
from typing import Any
from collections.abc import Callable
from typing import Any, Union
from redis.asyncio.client import Pipeline, StrictRedis
from redis.asyncio.cluster import ClusterPipeline, RedisCluster


def patch_redis_async_pipeline(pipeline_cls):
# type: (Any) -> None
def patch_redis_async_pipeline(
pipeline_cls, is_cluster, get_command_args_fn, set_db_data_fn
):
# type: (Union[type[Pipeline[Any]], type[ClusterPipeline[Any]]], bool, Any, Callable[[Span, Any], None]) -> None
old_execute = pipeline_cls.execute

async def _sentry_execute(self, *args, **kwargs):
Expand All @@ -32,22 +36,22 @@ async def _sentry_execute(self, *args, **kwargs):
op=OP.DB_REDIS, description="redis.pipeline.execute"
) as span:
with capture_internal_exceptions():
_set_db_data(span, self.connection_pool.connection_kwargs)
set_db_data_fn(span, self)
_set_pipeline_data(
span,
False,
_get_redis_command_args,
self.is_transaction,
self.command_stack,
is_cluster,
get_command_args_fn,
False if is_cluster else self.is_transaction,
self._command_stack if is_cluster else self.command_stack,
)

return await old_execute(self, *args, **kwargs)

pipeline_cls.execute = _sentry_execute
pipeline_cls.execute = _sentry_execute # type: ignore[method-assign]


def patch_redis_async_client(cls):
# type: (Any) -> None
def patch_redis_async_client(cls, is_cluster, set_db_data_fn):
# type: (Union[type[StrictRedis[Any]], type[RedisCluster[Any]]], bool, Callable[[Span, Any], None]) -> None
old_execute_command = cls.execute_command

async def _sentry_execute_command(self, name, *args, **kwargs):
Expand All @@ -60,9 +64,9 @@ async def _sentry_execute_command(self, name, *args, **kwargs):
description = _get_span_description(name, *args)

with hub.start_span(op=OP.DB_REDIS, description=description) as span:
_set_db_data(span, self.connection_pool.connection_kwargs)
_set_client_data(span, False, name, *args)
set_db_data_fn(span, self)
_set_client_data(span, is_cluster, name, *args)

return await old_execute_command(self, name, *args, **kwargs)

cls.execute_command = _sentry_execute_command
cls.execute_command = _sentry_execute_command # type: ignore[method-assign]
3 changes: 3 additions & 0 deletions tests/integrations/redis/cluster/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import pytest

pytest.importorskip("redis.cluster")
Loading

0 comments on commit 75f89b8

Please sign in to comment.