Skip to content

Commit

Permalink
commands/cluster: use pipeline to execute split commands (#2230)
Browse files Browse the repository at this point in the history
- allow passing target_nodes to pipeline commands

- move READ_COMMANDS to commands/cluster to avoid import cycle
- add types to list_or_args
  • Loading branch information
utkarshgupta137 authored Jun 27, 2022
1 parent 11cf66a commit d7d4336
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 153 deletions.
17 changes: 11 additions & 6 deletions redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from redis.cluster import (
PIPELINE_BLOCKED_COMMANDS,
PRIMARY,
READ_COMMANDS,
REPLICA,
SLOT_ID,
AbstractRedisCluster,
Expand All @@ -32,7 +31,7 @@
get_node_name,
parse_cluster_slots,
)
from redis.commands import AsyncRedisClusterCommands
from redis.commands import READ_COMMANDS, AsyncRedisClusterCommands
from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
from redis.exceptions import (
AskError,
Expand Down Expand Up @@ -1350,11 +1349,17 @@ async def _execute(

nodes = {}
for cmd in todo:
target_nodes = await client._determine_nodes(*cmd.args)
if not target_nodes:
raise RedisClusterException(
f"No targets were found to execute {cmd.args} command on"
passed_targets = cmd.kwargs.pop("target_nodes", None)
if passed_targets and not client._is_node_flag(passed_targets):
target_nodes = client._parse_target_nodes(passed_targets)
else:
target_nodes = await client._determine_nodes(
*cmd.args, node_flag=passed_targets
)
if not target_nodes:
raise RedisClusterException(
f"No targets were found to execute {cmd.args} command on"
)
if len(target_nodes) > 1:
raise RedisClusterException(f"Too many targets for command {cmd.args}")

Expand Down
65 changes: 15 additions & 50 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from typing import Any, Callable, Dict, Tuple

from redis.client import CaseInsensitiveDict, PubSub, Redis, parse_scan
from redis.commands import CommandsParser, RedisClusterCommands
from redis.commands import READ_COMMANDS, CommandsParser, RedisClusterCommands
from redis.connection import ConnectionPool, DefaultParser, Encoder, parse_url
from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
from redis.exceptions import (
Expand Down Expand Up @@ -154,52 +154,6 @@ def parse_cluster_shards(resp, **options):
)
KWARGS_DISABLED_KEYS = ("host", "port")

# Not complete, but covers the major ones
# https://redis.io/commands
READ_COMMANDS = frozenset(
[
"BITCOUNT",
"BITPOS",
"EXISTS",
"GEODIST",
"GEOHASH",
"GEOPOS",
"GEORADIUS",
"GEORADIUSBYMEMBER",
"GET",
"GETBIT",
"GETRANGE",
"HEXISTS",
"HGET",
"HGETALL",
"HKEYS",
"HLEN",
"HMGET",
"HSTRLEN",
"HVALS",
"KEYS",
"LINDEX",
"LLEN",
"LRANGE",
"MGET",
"PTTL",
"RANDOMKEY",
"SCARD",
"SDIFF",
"SINTER",
"SISMEMBER",
"SMEMBERS",
"SRANDMEMBER",
"STRLEN",
"SUNION",
"TTL",
"ZCARD",
"ZCOUNT",
"ZRANGE",
"ZSCORE",
]
)


def cleanup_kwargs(**kwargs):
"""
Expand Down Expand Up @@ -1993,14 +1947,25 @@ def _send_cluster_commands(
# refer to our internal node -> slot table that
# tells us where a given
# command should route to.
node = self._determine_nodes(*c.args)
passed_targets = c.options.pop("target_nodes", None)
if passed_targets and not self._is_nodes_flag(passed_targets):
target_nodes = self._parse_target_nodes(passed_targets)
else:
target_nodes = self._determine_nodes(*c.args, node_flag=passed_targets)
if not target_nodes:
raise RedisClusterException(
f"No targets were found to execute {c.args} command on"
)
if len(target_nodes) > 1:
raise RedisClusterException(f"Too many targets for command {c.args}")

node = target_nodes[0]
# now that we know the name of the node
# ( it's just a string in the form of host:port )
# we can build a list of commands for each node.
node_name = node[0].name
node_name = node.name
if node_name not in nodes:
redis_node = self.get_redis_connection(node[0])
redis_node = self.get_redis_connection(node)
connection = get_connection(redis_node, c.args)
nodes[node_name] = NodeCommands(
redis_node.parse_response, redis_node.connection_pool, connection
Expand Down
13 changes: 7 additions & 6 deletions redis/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
from .cluster import AsyncRedisClusterCommands, RedisClusterCommands
from .cluster import READ_COMMANDS, AsyncRedisClusterCommands, RedisClusterCommands
from .core import AsyncCoreCommands, CoreCommands
from .helpers import list_or_args
from .parser import CommandsParser
from .redismodules import AsyncRedisModuleCommands, RedisModuleCommands
from .sentinel import AsyncSentinelCommands, SentinelCommands

__all__ = [
"AsyncCoreCommands",
"AsyncRedisClusterCommands",
"RedisClusterCommands",
"AsyncRedisModuleCommands",
"AsyncSentinelCommands",
"CommandsParser",
"AsyncCoreCommands",
"CoreCommands",
"list_or_args",
"AsyncRedisModuleCommands",
"READ_COMMANDS",
"RedisClusterCommands",
"RedisModuleCommands",
"AsyncSentinelCommands",
"SentinelCommands",
"list_or_args",
]
Loading

0 comments on commit d7d4336

Please sign in to comment.