Skip to content

Adding load balancing strategy configuration to cluster clients(replacement for 'read_from_replicas' config) #3563

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 33 additions & 7 deletions redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
SLOT_ID,
AbstractRedisCluster,
LoadBalancer,
LoadBalancingStrategy,
block_pipeline_command,
get_node_name,
parse_cluster_slots,
Expand Down Expand Up @@ -65,6 +66,7 @@
from redis.typing import AnyKeyT, EncodableT, KeyT
from redis.utils import (
SSL_AVAILABLE,
deprecated_args,
deprecated_function,
get_lib_version,
safe_str,
Expand Down Expand Up @@ -121,9 +123,15 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
| See:
https://redis.io/docs/manual/scaling/#redis-cluster-configuration-parameters
:param read_from_replicas:
| Enable read from replicas in READONLY mode. You can read possibly stale data.
| @deprecated - please use load_balancing_strategy instead
| Enable read from replicas in READONLY mode.
When set to true, read commands will be assigned between the primary and
its replications in a Round-Robin manner.
The data read from replicas is eventually consistent with the data in primary nodes.
:param load_balancing_strategy:
| Enable read from replicas in READONLY mode and defines the load balancing
strategy that will be used for cluster node selection.
The data read from replicas is eventually consistent with the data in primary nodes.
:param reinitialize_steps:
| Specifies the number of MOVED errors that need to occur before reinitializing
the whole cluster topology. If a MOVED error occurs and the cluster does not
Expand Down Expand Up @@ -216,6 +224,11 @@ def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster":
"result_callbacks",
)

@deprecated_args(
args_to_warn=["read_from_replicas"],
reason="Please configure the 'load_balancing_strategy' instead",
version="5.0.3",
)
def __init__(
self,
host: Optional[str] = None,
Expand All @@ -224,6 +237,7 @@ def __init__(
startup_nodes: Optional[List["ClusterNode"]] = None,
require_full_coverage: bool = True,
read_from_replicas: bool = False,
load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
reinitialize_steps: int = 5,
cluster_error_retry_attempts: int = 3,
connection_error_retry_attempts: int = 3,
Expand Down Expand Up @@ -322,7 +336,7 @@ def __init__(
}
)

if read_from_replicas:
if read_from_replicas or load_balancing_strategy:
# Call our on_connect function to configure READONLY mode
kwargs["redis_connect_func"] = self.on_connect

Expand Down Expand Up @@ -371,6 +385,7 @@ def __init__(
)
self.encoder = Encoder(encoding, encoding_errors, decode_responses)
self.read_from_replicas = read_from_replicas
self.load_balancing_strategy = load_balancing_strategy
self.reinitialize_steps = reinitialize_steps
self.cluster_error_retry_attempts = cluster_error_retry_attempts
self.connection_error_retry_attempts = connection_error_retry_attempts
Expand Down Expand Up @@ -589,6 +604,7 @@ async def _determine_nodes(
self.nodes_manager.get_node_from_slot(
await self._determine_slot(command, *args),
self.read_from_replicas and command in READ_COMMANDS,
self.load_balancing_strategy if command in READ_COMMANDS else None,
)
]

Expand Down Expand Up @@ -769,7 +785,11 @@ async def _execute_command(
# refresh the target node
slot = await self._determine_slot(*args)
target_node = self.nodes_manager.get_node_from_slot(
slot, self.read_from_replicas and args[0] in READ_COMMANDS
slot,
self.read_from_replicas and args[0] in READ_COMMANDS,
self.load_balancing_strategy
if args[0] in READ_COMMANDS
else None,
)
moved = False

Expand Down Expand Up @@ -1231,17 +1251,23 @@ def _update_moved_slots(self) -> None:
self._moved_exception = None

def get_node_from_slot(
self, slot: int, read_from_replicas: bool = False
self,
slot: int,
read_from_replicas: bool = False,
load_balancing_strategy=None,
) -> "ClusterNode":
if self._moved_exception:
self._update_moved_slots()

if read_from_replicas is True and load_balancing_strategy is None:
load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN

try:
if read_from_replicas:
# get the server index in a Round-Robin manner
if len(self.slots_cache[slot]) > 1 and load_balancing_strategy:
# get the server index using the strategy defined in load_balancing_strategy
primary_name = self.slots_cache[slot][0].name
node_idx = self.read_load_balancer.get_server_index(
primary_name, len(self.slots_cache[slot])
primary_name, len(self.slots_cache[slot]), load_balancing_strategy
)
return self.slots_cache[slot][node_idx]
return self.slots_cache[slot][0]
Expand Down
100 changes: 85 additions & 15 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import threading
import time
from collections import OrderedDict
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

from redis._parsers import CommandsParser, Encoder
Expand Down Expand Up @@ -482,6 +483,11 @@ class initializer. In the case of conflicting arguments, querystring
"""
return cls(url=url, **kwargs)

@deprecated_args(
args_to_warn=["read_from_replicas"],
reason="Please configure the 'load_balancing_strategy' instead",
version="5.0.3",
)
def __init__(
self,
host: Optional[str] = None,
Expand All @@ -492,6 +498,7 @@ def __init__(
require_full_coverage: bool = False,
reinitialize_steps: int = 5,
read_from_replicas: bool = False,
load_balancing_strategy: Optional["LoadBalancingStrategy"] = None,
dynamic_startup_nodes: bool = True,
url: Optional[str] = None,
address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
Expand Down Expand Up @@ -520,11 +527,16 @@ def __init__(
cluster client. If not all slots are covered, RedisClusterException
will be thrown.
:param read_from_replicas:
@deprecated - please use load_balancing_strategy instead
Enable read from replicas in READONLY mode. You can read possibly
stale data.
When set to true, read commands will be assigned between the
primary and its replications in a Round-Robin manner.
:param dynamic_startup_nodes:
:param load_balancing_strategy:
Enable read from replicas in READONLY mode and defines the load balancing
strategy that will be used for cluster node selection.
The data read from replicas is eventually consistent with the data in primary nodes.
:param dynamic_startup_nodes:
Set the RedisCluster's startup nodes to all of the discovered nodes.
If true (default value), the cluster's discovered nodes will be used to
determine the cluster nodes-slots mapping in the next topology refresh.
Expand Down Expand Up @@ -629,6 +641,7 @@ def __init__(
self.command_flags = self.__class__.COMMAND_FLAGS.copy()
self.node_flags = self.__class__.NODE_FLAGS.copy()
self.read_from_replicas = read_from_replicas
self.load_balancing_strategy = load_balancing_strategy
self.reinitialize_counter = 0
self.reinitialize_steps = reinitialize_steps
if event_dispatcher is None:
Expand Down Expand Up @@ -683,7 +696,7 @@ def on_connect(self, connection):
"""
connection.on_connect()

if self.read_from_replicas:
if self.read_from_replicas or self.load_balancing_strategy:
# Sending READONLY command to server to configure connection as
# readonly. Since each cluster node may change its server type due
# to a failover, we should establish a READONLY connection
Expand Down Expand Up @@ -810,6 +823,7 @@ def pipeline(self, transaction=None, shard_hint=None):
cluster_response_callbacks=self.cluster_response_callbacks,
cluster_error_retry_attempts=self.cluster_error_retry_attempts,
read_from_replicas=self.read_from_replicas,
load_balancing_strategy=self.load_balancing_strategy,
reinitialize_steps=self.reinitialize_steps,
lock=self._lock,
)
Expand Down Expand Up @@ -934,7 +948,9 @@ def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]:
# get the node that holds the key's slot
slot = self.determine_slot(*args)
node = self.nodes_manager.get_node_from_slot(
slot, self.read_from_replicas and command in READ_COMMANDS
slot,
self.read_from_replicas and command in READ_COMMANDS,
self.load_balancing_strategy if command in READ_COMMANDS else None,
)
return [node]

Expand Down Expand Up @@ -1158,7 +1174,11 @@ def _execute_command(self, target_node, *args, **kwargs):
# refresh the target node
slot = self.determine_slot(*args)
target_node = self.nodes_manager.get_node_from_slot(
slot, self.read_from_replicas and command in READ_COMMANDS
slot,
self.read_from_replicas and command in READ_COMMANDS,
self.load_balancing_strategy
if command in READ_COMMANDS
else None,
)
moved = False

Expand Down Expand Up @@ -1307,6 +1327,12 @@ def __del__(self):
self.redis_connection.close()


class LoadBalancingStrategy(Enum):
ROUND_ROBIN = "round_robin"
ROUND_ROBIN_REPLICAS = "round_robin_replicas"
RANDOM_REPLICA = "random_replica"


class LoadBalancer:
"""
Round-Robin Load Balancing
Expand All @@ -1316,15 +1342,38 @@ def __init__(self, start_index: int = 0) -> None:
self.primary_to_idx = {}
self.start_index = start_index

def get_server_index(self, primary: str, list_size: int) -> int:
server_index = self.primary_to_idx.setdefault(primary, self.start_index)
# Update the index
self.primary_to_idx[primary] = (server_index + 1) % list_size
return server_index
def get_server_index(
self,
primary: str,
list_size: int,
load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN,
) -> int:
if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA:
return self._get_random_replica_index(list_size)
else:
return self._get_round_robin_index(
primary,
list_size,
load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS,
)

def reset(self) -> None:
self.primary_to_idx.clear()

def _get_random_replica_index(self, list_size: int) -> int:
return random.randint(1, list_size - 1)

def _get_round_robin_index(
self, primary: str, list_size: int, replicas_only: bool
) -> int:
server_index = self.primary_to_idx.setdefault(primary, self.start_index)
if replicas_only and server_index == 0:
# skip the primary node index
server_index = 1
# Update the index for the next round
self.primary_to_idx[primary] = (server_index + 1) % list_size
return server_index


class NodesManager:
def __init__(
Expand Down Expand Up @@ -1428,7 +1477,21 @@ def _update_moved_slots(self):
# Reset moved_exception
self._moved_exception = None

def get_node_from_slot(self, slot, read_from_replicas=False, server_type=None):
@deprecated_args(
args_to_warn=["server_type"],
reason=(
"In case you need select some load balancing strategy "
"that will use replicas, please set it through 'load_balancing_strategy'"
),
version="5.0.3",
)
def get_node_from_slot(
self,
slot,
read_from_replicas=False,
load_balancing_strategy=None,
server_type=None,
):
"""
Gets a node that servers this hash slot
"""
Expand All @@ -1443,11 +1506,14 @@ def get_node_from_slot(self, slot, read_from_replicas=False, server_type=None):
f'"require_full_coverage={self._require_full_coverage}"'
)

if read_from_replicas is True:
# get the server index in a Round-Robin manner
if read_from_replicas is True and load_balancing_strategy is None:
load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN

if len(self.slots_cache[slot]) > 1 and load_balancing_strategy:
# get the server index using the strategy defined in load_balancing_strategy
primary_name = self.slots_cache[slot][0].name
node_idx = self.read_load_balancer.get_server_index(
primary_name, len(self.slots_cache[slot])
primary_name, len(self.slots_cache[slot]), load_balancing_strategy
)
elif (
server_type is None
Expand Down Expand Up @@ -1730,7 +1796,7 @@ def __init__(
first command execution. The node will be determined by:
1. Hashing the channel name in the request to find its keyslot
2. Selecting a node that handles the keyslot: If read_from_replicas is
set to true, a replica can be selected.
set to true or load_balancing_strategy is set, a replica can be selected.

:type redis_cluster: RedisCluster
:type node: ClusterNode
Expand Down Expand Up @@ -1826,7 +1892,9 @@ def execute_command(self, *args):
channel = args[1]
slot = self.cluster.keyslot(channel)
node = self.cluster.nodes_manager.get_node_from_slot(
slot, self.cluster.read_from_replicas
slot,
self.cluster.read_from_replicas,
self.cluster.load_balancing_strategy,
)
else:
# Get a random node
Expand Down Expand Up @@ -1969,6 +2037,7 @@ def __init__(
cluster_response_callbacks: Optional[Dict[str, Callable]] = None,
startup_nodes: Optional[List["ClusterNode"]] = None,
read_from_replicas: bool = False,
load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
cluster_error_retry_attempts: int = 3,
reinitialize_steps: int = 5,
lock=None,
Expand All @@ -1984,6 +2053,7 @@ def __init__(
)
self.startup_nodes = startup_nodes if startup_nodes else []
self.read_from_replicas = read_from_replicas
self.load_balancing_strategy = load_balancing_strategy
self.command_flags = self.__class__.COMMAND_FLAGS.copy()
self.cluster_response_callbacks = cluster_response_callbacks
self.cluster_error_retry_attempts = cluster_error_retry_attempts
Expand Down
Loading
Loading