Skip to content

async_cluster: optimisations #2205

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions benchmarks/cluster_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ async def main(loop, gather=None):
port = 16379
password = None

count = 1000
size = 16
count = 10000
size = 256

asyncio.run(main("asyncio"))
asyncio.run(main("asyncio", gather=False))
Expand Down
107 changes: 107 additions & 0 deletions benchmarks/cluster_async_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import asyncio
import functools
import time

import aioredis_cluster
import aredis
import uvloop

import redis.asyncio as redispy


def timer(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
tic = time.perf_counter()
await func(*args, **kwargs)
toc = time.perf_counter()
return f"{toc - tic:.4f}"

return wrapper


@timer
async def warmup(client):
await asyncio.gather(
*(asyncio.create_task(client.exists(f"bench:warmup_{i}")) for i in range(100))
)


@timer
async def run(client):
data_str = "a" * size
data_int = int("1" * size)

for i in range(count):
with client.pipeline() as pipe:
await (
pipe.set(f"bench:str_{i}", data_str)
.set(f"bench:int_{i}", data_int)
.get(f"bench:str_{i}")
.get(f"bench:int_{i}")
.hset("bench:hset", str(i), data_str)
.hget("bench:hset", str(i))
.incr("bench:incr")
.lpush("bench:lpush", data_int)
.lrange("bench:lpush", 0, 300)
.lpop("bench:lpush")
.execute()
)


async def main(loop):
arc = aredis.StrictRedisCluster(
host=host,
port=port,
password=password,
max_connections=2**31,
max_connections_per_node=2**31,
readonly=False,
reinitialize_steps=count,
skip_full_coverage_check=True,
decode_responses=False,
max_idle_time=count,
idle_check_interval=count,
)
print(f"{loop} {await warmup(arc)} aredis")
print(await run(arc))
arc.connection_pool.disconnect()

aiorc = await aioredis_cluster.create_redis_cluster(
[(host, port)],
password=password,
state_reload_interval=count,
idle_connection_timeout=count,
pool_maxsize=2**31,
)
print(f"{loop} {await warmup(aiorc)} aioredis-cluster")
print(await run(aiorc))
aiorc.close()
await aiorc.wait_closed()

async with redispy.RedisCluster(
host=host,
port=port,
password=password,
reinitialize_steps=count,
read_from_replicas=False,
decode_responses=False,
max_connections=2**31,
) as rca:
print(f"{loop} {await warmup(rca)} redispy")
print(await run(rca))


if __name__ == "__main__":
host = "localhost"
port = 16379
password = None

count = 10000
size = 256

asyncio.run(main("asyncio"))

uvloop.install()

asyncio.run(main("uvloop"))
165 changes: 86 additions & 79 deletions redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,9 +468,8 @@ def keyslot(self, key: EncodableT) -> int:
return key_slot(k)

async def _determine_nodes(
self, *args: Any, node_flag: Optional[str] = None
self, command: str, *args: Any, node_flag: Optional[str] = None
) -> List["ClusterNode"]:
command = args[0]
if not node_flag:
# get the nodes group for this command if it was predefined
node_flag = self.command_flags.get(command)
Expand All @@ -495,16 +494,15 @@ async def _determine_nodes(
# get the node that holds the key's slot
return [
self.nodes_manager.get_node_from_slot(
await self._determine_slot(*args),
await self._determine_slot(command, *args),
self.read_from_replicas and command in READ_COMMANDS,
)
]

async def _determine_slot(self, *args: Any) -> int:
command = args[0]
async def _determine_slot(self, command: str, *args: Any) -> int:
if self.command_flags.get(command) == SLOT_ID:
# The command contains the slot ID
return int(args[1])
return int(args[0])

# Get the keys in the command

Expand All @@ -516,19 +514,17 @@ async def _determine_slot(self, *args: Any) -> int:
# - fix: https://github.com/redis/redis/pull/9733
if command in ("EVAL", "EVALSHA"):
# command syntax: EVAL "script body" num_keys ...
if len(args) <= 2:
raise RedisClusterException(f"Invalid args in command: {args}")
num_actual_keys = args[2]
eval_keys = args[3 : 3 + num_actual_keys]
if len(args) < 2:
raise RedisClusterException(
f"Invalid args in command: {command, *args}"
)
keys = args[2 : 2 + args[1]]
# if there are 0 keys, that means the script can be run on any node
# so we can just return a random slot
if not eval_keys:
if not keys:
return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
keys = eval_keys
else:
keys = await self.commands_parser.get_keys(
self.nodes_manager.default_node, *args
)
keys = await self.commands_parser.get_keys(command, *args)
if not keys:
# FCALL can call a function with 0 keys, that means the function
# can be run on any node so we can just return a random slot
Expand Down Expand Up @@ -848,13 +844,13 @@ def acquire_connection(self) -> Connection:
self._free.append(connection)

return self._free.popleft()
else:
if len(self._connections) < self.max_connections:
connection = self.connection_class(**self.connection_kwargs)
self._connections.append(connection)
return connection
else:
raise ConnectionError("Too many connections")

if len(self._connections) < self.max_connections:
connection = self.connection_class(**self.connection_kwargs)
self._connections.append(connection)
return connection

raise ConnectionError("Too many connections")

async def parse_response(
self, connection: Connection, command: str, **kwargs: Any
Expand All @@ -872,10 +868,10 @@ async def parse_response(
raise

# Return response
try:
if command in self.response_callbacks:
return self.response_callbacks[command](response, **kwargs)
except KeyError:
return response

return response

async def execute_command(self, *args: Any, **kwargs: Any) -> Any:
# Acquire connection
Expand All @@ -891,7 +887,7 @@ async def execute_command(self, *args: Any, **kwargs: Any) -> Any:
# Release connection
self._free.append(connection)

async def execute_pipeline(self) -> None:
async def execute_pipeline(self) -> bool:
# Acquire connection
connection = self.acquire_connection()

Expand All @@ -901,17 +897,20 @@ async def execute_pipeline(self) -> None:
)

# Read responses
try:
for cmd in self._command_stack:
try:
cmd.result = await self.parse_response(
connection, cmd.args[0], **cmd.kwargs
)
except Exception as e:
cmd.result = e
finally:
# Release connection
self._free.append(connection)
ret = False
for cmd in self._command_stack:
try:
cmd.result = await self.parse_response(
connection, cmd.args[0], **cmd.kwargs
)
except Exception as e:
cmd.result = e
ret = True

# Release connection
self._free.append(connection)

return ret


class NodesManager:
Expand Down Expand Up @@ -1257,6 +1256,13 @@ async def __aexit__(self, exc_type: None, exc_value: None, traceback: None) -> N
def __await__(self) -> Generator[Any, None, "ClusterPipeline"]:
return self.initialize().__await__()

def __enter__(self) -> "ClusterPipeline":
self._command_stack = []
return self

def __exit__(self, exc_type: None, exc_value: None, traceback: None) -> None:
self._command_stack = []

def __bool__(self) -> bool:
return bool(self._command_stack)

Expand Down Expand Up @@ -1310,6 +1316,7 @@ async def execute(

try:
return await self._execute(
self._client,
self._command_stack,
raise_on_error=raise_on_error,
allow_redirections=allow_redirections,
Expand All @@ -1331,60 +1338,60 @@ async def execute(

async def _execute(
self,
client: "RedisCluster",
stack: List["PipelineCommand"],
raise_on_error: bool = True,
allow_redirections: bool = True,
) -> List[Any]:
client = self._client
todo = [
cmd for cmd in stack if not cmd.result or isinstance(cmd.result, Exception)
]

nodes = {}
for cmd in stack:
if not cmd.result or isinstance(cmd.result, Exception):
target_nodes = await client._determine_nodes(*cmd.args)
if not target_nodes:
raise RedisClusterException(
f"No targets were found to execute {cmd.args} command on"
)
if len(target_nodes) > 1:
raise RedisClusterException(
f"Too many targets for command {cmd.args}"
)
for cmd in todo:
target_nodes = await client._determine_nodes(*cmd.args)
if not target_nodes:
raise RedisClusterException(
f"No targets were found to execute {cmd.args} command on"
)
if len(target_nodes) > 1:
raise RedisClusterException(f"Too many targets for command {cmd.args}")

node = target_nodes[0]
if node.name not in nodes:
nodes[node.name] = node
node._command_stack = []
node._command_stack.append(cmd)
node = target_nodes[0]
if node.name not in nodes:
nodes[node.name] = node
node._command_stack = []
node._command_stack.append(cmd)

await asyncio.gather(
errors = await asyncio.gather(
*(asyncio.ensure_future(node.execute_pipeline()) for node in nodes.values())
)

if allow_redirections:
# send each errored command individually
for cmd in stack:
if isinstance(cmd.result, (TryAgainError, MovedError, AskError)):
try:
cmd.result = await client.execute_command(
*cmd.args, **cmd.kwargs
if any(errors):
if allow_redirections:
# send each errored command individually
for cmd in todo:
if isinstance(cmd.result, (TryAgainError, MovedError, AskError)):
try:
cmd.result = await client.execute_command(
*cmd.args, **cmd.kwargs
)
except Exception as e:
cmd.result = e

if raise_on_error:
for cmd in todo:
result = cmd.result
if isinstance(result, Exception):
command = " ".join(map(safe_str, cmd.args))
msg = (
f"Command # {cmd.position + 1} ({command}) of pipeline "
f"caused error: {result.args}"
)
except Exception as e:
cmd.result = e

responses = [cmd.result for cmd in stack]

if raise_on_error:
for cmd in stack:
result = cmd.result
if isinstance(result, Exception):
command = " ".join(map(safe_str, cmd.args))
msg = (
f"Command # {cmd.position + 1} ({command}) of pipeline "
f"caused error: {result.args}"
)
result.args = (msg,) + result.args[1:]
raise result
result.args = (msg,) + result.args[1:]
raise result

return responses
return [cmd.result for cmd in stack]

def _split_command_across_slots(
self, command: str, *keys: KeyT
Expand Down
Loading