Skip to content

Add support for JSON, TIMESERIES, BLOOM & GRAPH commands in cluster #2032

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ class RedisCluster(RedisClusterCommands):
"READONLY",
"READWRITE",
"TIME",
"GRAPH.CONFIG",
],
DEFAULT_NODE,
),
Expand Down Expand Up @@ -810,6 +811,10 @@ def lock(
thread_local=thread_local,
)

def set_response_callback(self, command, callback):
"""Set a custom Response Callback"""
self.cluster_response_callbacks[command] = callback

def _determine_nodes(self, *args, **kwargs):
command = args[0]
nodes_flag = kwargs.pop("nodes_flag", None)
Expand Down Expand Up @@ -1181,6 +1186,20 @@ def _process_result(self, command, res, **kwargs):
else:
return res

def load_external_module(
self,
funcname,
func,
):
"""
This function can be used to add externally defined redis modules,
and their namespaces to the redis client.

``funcname`` - A string containing the name of the function to create
``func`` - The function, being added to this class.
"""
setattr(self, funcname, func)


class ClusterNode:
def __init__(self, host, port, server_type=None, redis_connection=None):
Expand Down Expand Up @@ -2026,7 +2045,13 @@ def _send_cluster_commands(

# turn the response back into a simple flat array that corresponds
# to the sequence of commands issued in the stack in pipeline.execute()
response = [c.result for c in sorted(stack, key=lambda x: x.position)]
response = []
for c in sorted(stack, key=lambda x: x.position):
if c.args[0] in self.cluster_response_callbacks:
c.result = self.cluster_response_callbacks[c.args[0]](
c.result, **c.options
)
response.append(c.result)

if raise_on_error:
self.raise_first_error(stack)
Expand All @@ -2040,6 +2065,9 @@ def _fail_on_redirect(self, allow_redirections):
"ASK & MOVED redirection not allowed in this pipeline"
)

def exists(self, *keys):
return self.execute_command("EXISTS", *keys)

def eval(self):
""" """
raise RedisClusterException("method eval() is not implemented")
Expand Down
2 changes: 2 additions & 0 deletions redis/commands/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
ScriptCommands,
)
from .helpers import list_or_args
from .redismodules import RedisModuleCommands


class ClusterMultiKeyCommands:
Expand Down Expand Up @@ -212,6 +213,7 @@ class RedisClusterCommands(
PubSubCommands,
ClusterDataAccessCommands,
ScriptCommands,
RedisModuleCommands,
):
"""
A class for all Redis Cluster commands
Expand Down
30 changes: 24 additions & 6 deletions redis/commands/json/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,34 @@ def pipeline(self, transaction=True, shard_hint=None):
pipe.jsonget('foo')
pipe.jsonget('notakey')
"""
p = Pipeline(
connection_pool=self.client.connection_pool,
response_callbacks=self.MODULE_CALLBACKS,
transaction=transaction,
shard_hint=shard_hint,
)
if isinstance(self.client, redis.RedisCluster):
p = ClusterPipeline(
nodes_manager=self.client.nodes_manager,
commands_parser=self.client.commands_parser,
startup_nodes=self.client.nodes_manager.startup_nodes,
result_callbacks=self.client.result_callbacks,
cluster_response_callbacks=self.client.cluster_response_callbacks,
cluster_error_retry_attempts=self.client.cluster_error_retry_attempts,
read_from_replicas=self.client.read_from_replicas,
reinitialize_steps=self.client.reinitialize_steps,
)

else:
p = Pipeline(
connection_pool=self.client.connection_pool,
response_callbacks=self.MODULE_CALLBACKS,
transaction=transaction,
shard_hint=shard_hint,
)

p._encode = self._encode
p._decode = self._decode
return p


class ClusterPipeline(JSONCommands, redis.cluster.ClusterPipeline):
"""Cluster pipeline for the module."""


class Pipeline(JSONCommands, redis.client.Pipeline):
"""Pipeline for the module."""
9 changes: 8 additions & 1 deletion redis/commands/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@ def __init__(self, redis_connection):
self.initialize(redis_connection)

def initialize(self, r):
self.commands = r.execute_command("COMMAND")
commands = r.execute_command("COMMAND")
uppercase_commands = []
for cmd in commands:
if any(x.isupper() for x in cmd):
uppercase_commands.append(cmd)
for cmd in uppercase_commands:
commands[cmd.lower()] = commands.pop(cmd)
self.commands = commands

# As soon as this PR is merged into Redis, we should reimplement
# our logic to use COMMAND INFO changes to determine the key positions
Expand Down
31 changes: 24 additions & 7 deletions redis/commands/timeseries/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import redis.client
import redis

from ..helpers import parse_to_list
from .commands import (
Expand Down Expand Up @@ -67,14 +67,31 @@ def pipeline(self, transaction=True, shard_hint=None):
pipeline.execute()

"""
p = Pipeline(
connection_pool=self.client.connection_pool,
response_callbacks=self.MODULE_CALLBACKS,
transaction=transaction,
shard_hint=shard_hint,
)
if isinstance(self.client, redis.RedisCluster):
p = ClusterPipeline(
nodes_manager=self.client.nodes_manager,
commands_parser=self.client.commands_parser,
startup_nodes=self.client.nodes_manager.startup_nodes,
result_callbacks=self.client.result_callbacks,
cluster_response_callbacks=self.client.cluster_response_callbacks,
cluster_error_retry_attempts=self.client.cluster_error_retry_attempts,
read_from_replicas=self.client.read_from_replicas,
reinitialize_steps=self.client.reinitialize_steps,
)

else:
p = Pipeline(
connection_pool=self.client.connection_pool,
response_callbacks=self.MODULE_CALLBACKS,
transaction=transaction,
shard_hint=shard_hint,
)
return p


class ClusterPipeline(TimeSeriesCommands, redis.cluster.ClusterPipeline):
"""Cluster pipeline for the module."""


class Pipeline(TimeSeriesCommands, redis.client.Pipeline):
"""Pipeline for the module."""
1 change: 1 addition & 0 deletions tests/test_bloom.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ def test_cms(client):


@pytest.mark.redismod
@pytest.mark.onlynoncluster
def test_cms_merge(client):
assert client.cms().initbydim("A", 1000, 5)
assert client.cms().initbydim("B", 1000, 5)
Expand Down
1 change: 1 addition & 0 deletions tests/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ def test_config(client):


@pytest.mark.redismod
@pytest.mark.onlynoncluster
def test_list_keys(client):
result = client.graph().list_keys()
assert result == []
Expand Down
3 changes: 3 additions & 0 deletions tests/test_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

from .conftest import default_redismod_url, skip_ifmodversion_lt

pytestmark = pytest.mark.onlynoncluster


WILL_PLAY_TEXT = os.path.abspath(
os.path.join(os.path.dirname(__file__), "testdata", "will_play_text.csv.bz2")
)
Expand Down
5 changes: 5 additions & 0 deletions tests/test_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ def test_rev_range(client):


@pytest.mark.redismod
@pytest.mark.onlynoncluster
def testMultiRange(client):
client.ts().create(1, labels={"Test": "This", "team": "ny"})
client.ts().create(2, labels={"Test": "This", "Taste": "That", "team": "sf"})
Expand Down Expand Up @@ -293,6 +294,7 @@ def testMultiRange(client):


@pytest.mark.redismod
@pytest.mark.onlynoncluster
@skip_ifmodversion_lt("99.99.99", "timeseries")
def test_multi_range_advanced(client):
client.ts().create(1, labels={"Test": "This", "team": "ny"})
Expand Down Expand Up @@ -349,6 +351,7 @@ def test_multi_range_advanced(client):


@pytest.mark.redismod
@pytest.mark.onlynoncluster
@skip_ifmodversion_lt("99.99.99", "timeseries")
def test_multi_reverse_range(client):
client.ts().create(1, labels={"Test": "This", "team": "ny"})
Expand Down Expand Up @@ -442,6 +445,7 @@ def test_get(client):


@pytest.mark.redismod
@pytest.mark.onlynoncluster
def test_mget(client):
client.ts().create(1, labels={"Test": "This"})
client.ts().create(2, labels={"Test": "This", "Taste": "That"})
Expand Down Expand Up @@ -483,6 +487,7 @@ def testInfoDuplicatePolicy(client):


@pytest.mark.redismod
@pytest.mark.onlynoncluster
def test_query_index(client):
client.ts().create(1, labels={"Test": "This"})
client.ts().create(2, labels={"Test": "This", "Taste": "That"})
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ setenv =
commands =
standalone: pytest --cov=./ --cov-report=xml:coverage_redis.xml -W always -m 'not onlycluster' {posargs}
standalone-uvloop: pytest --cov=./ --cov-report=xml:coverage_redis.xml -W always -m 'not onlycluster' --uvloop {posargs}
cluster: pytest --cov=./ --cov-report=xml:coverage_cluster.xml -W always -m 'not onlynoncluster and not redismod' --redis-url={env:CLUSTER_URL:} {posargs}
cluster: pytest --cov=./ --cov-report=xml:coverage_cluster.xml -W always -m 'not onlynoncluster and not redismod' --redis-url={env:CLUSTER_URL:} --redismod-url={env:CLUSTER_URL:} {posargs}
cluster-uvloop: pytest --cov=./ --cov-report=xml:coverage_redis.xml -W always -m 'not onlycluster' --uvloop {posargs}

[testenv:redis5]
Expand Down