Skip to content

Add Async RedisCluster #2099

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
May 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e7fc06b
Copy Cluster Client, Commands, Commands Parser, Tests for asyncio
utkarshgupta137 Apr 10, 2022
0b07706
Async Cluster Tests: Async/Await
utkarshgupta137 Apr 10, 2022
96d992c
Add Async RedisCluster
utkarshgupta137 Apr 10, 2022
921152b
cluster: use ERRORS_ALLOW_RETRY from self.__class__
utkarshgupta137 Apr 14, 2022
48c3530
async_cluster: rework redis_connection, initialize, & close
utkarshgupta137 Apr 15, 2022
1707a5a
async_cluster: add types
utkarshgupta137 Apr 15, 2022
6627afa
async_cluster: add docs
utkarshgupta137 Apr 15, 2022
7e6f9f1
docs: update sphinx & add sphinx_autodoc_typehints
utkarshgupta137 Apr 15, 2022
9ce1fed
async_cluster: move TargetNodesT to cluster module
utkarshgupta137 Apr 18, 2022
66893f6
async_cluster/commands: inherit commands from sync class if possible
utkarshgupta137 Apr 18, 2022
7ac208f
async_cluster: add benchmark script with aredis & aioredis-cluster
utkarshgupta137 Apr 19, 2022
0b6fc92
async_cluster: remove logging
utkarshgupta137 Apr 20, 2022
8cbf56d
async_cluster: inline functions
utkarshgupta137 Apr 22, 2022
cf1b3e3
async_cluster: manage Connection instead of Redis Client
utkarshgupta137 Apr 22, 2022
3379475
async_cluster/commands: optimize parser
utkarshgupta137 Apr 22, 2022
9566a7f
async_cluster: use ensure_future & generators for gather
utkarshgupta137 Apr 22, 2022
ee338b1
async_conn: optimize
utkarshgupta137 Apr 23, 2022
c4abee1
async_cluster: optimize determine_slot
utkarshgupta137 Apr 22, 2022
8dad0a2
async_cluster: optimize determine_nodes
utkarshgupta137 Apr 22, 2022
0a48fff
async_cluster/parser: optimize _get_moveable_keys
utkarshgupta137 Apr 27, 2022
9a4c741
async_cluster: inlined check_slots_coverage
utkarshgupta137 Apr 27, 2022
4344145
async_cluster: update docstrings
utkarshgupta137 Apr 27, 2022
22506e2
async_cluster: add concurrent test & use read_response/_update_moved_…
utkarshgupta137 Apr 27, 2022
a5f6e4b
Merge remote-tracking branch 'redis/master'
utkarshgupta137 Apr 27, 2022
b3cd82b
Merge remote-tracking branch 'original/master'
utkarshgupta137 Apr 28, 2022
9b1c983
Merge branch 'master' into master
chayim May 2, 2022
8e0b461
Merge branch 'master' into master
chayim May 3, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 263 additions & 0 deletions benchmarks/cluster_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
import asyncio
import functools
import time

import aioredis_cluster
import aredis
import uvloop

import redis.asyncio as redispy


def timer(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
tic = time.perf_counter()
await func(*args, **kwargs)
toc = time.perf_counter()
return f"{toc - tic:.4f}"

return wrapper


@timer
async def set_str(client, gather, data):
if gather:
for _ in range(count // 100):
await asyncio.gather(
*(
asyncio.create_task(client.set(f"bench:str_{i}", data))
for i in range(100)
)
)
else:
for i in range(count):
await client.set(f"bench:str_{i}", data)


@timer
async def set_int(client, gather, data):
if gather:
for _ in range(count // 100):
await asyncio.gather(
*(
asyncio.create_task(client.set(f"bench:int_{i}", data))
for i in range(100)
)
)
else:
for i in range(count):
await client.set(f"bench:int_{i}", data)


@timer
async def get_str(client, gather):
if gather:
for _ in range(count // 100):
await asyncio.gather(
*(asyncio.create_task(client.get(f"bench:str_{i}")) for i in range(100))
)
else:
for i in range(count):
await client.get(f"bench:str_{i}")


@timer
async def get_int(client, gather):
if gather:
for _ in range(count // 100):
await asyncio.gather(
*(asyncio.create_task(client.get(f"bench:int_{i}")) for i in range(100))
)
else:
for i in range(count):
await client.get(f"bench:int_{i}")


@timer
async def hset(client, gather, data):
if gather:
for _ in range(count // 100):
await asyncio.gather(
*(
asyncio.create_task(client.hset("bench:hset", str(i), data))
for i in range(100)
)
)
else:
for i in range(count):
await client.hset("bench:hset", str(i), data)


@timer
async def hget(client, gather):
if gather:
for _ in range(count // 100):
await asyncio.gather(
*(
asyncio.create_task(client.hget("bench:hset", str(i)))
for i in range(100)
)
)
else:
for i in range(count):
await client.hget("bench:hset", str(i))


@timer
async def incr(client, gather):
if gather:
for _ in range(count // 100):
await asyncio.gather(
*(asyncio.create_task(client.incr("bench:incr")) for i in range(100))
)
else:
for i in range(count):
await client.incr("bench:incr")


@timer
async def lpush(client, gather, data):
if gather:
for _ in range(count // 100):
await asyncio.gather(
*(
asyncio.create_task(client.lpush("bench:lpush", data))
for i in range(100)
)
)
else:
for i in range(count):
await client.lpush("bench:lpush", data)


@timer
async def lrange_300(client, gather):
if gather:
for _ in range(count // 100):
await asyncio.gather(
*(
asyncio.create_task(client.lrange("bench:lpush", i, i + 300))
for i in range(100)
)
)
else:
for i in range(count):
await client.lrange("bench:lpush", i, i + 300)


@timer
async def lpop(client, gather):
if gather:
for _ in range(count // 100):
await asyncio.gather(
*(asyncio.create_task(client.lpop("bench:lpush")) for i in range(100))
)
else:
for i in range(count):
await client.lpop("bench:lpush")


@timer
async def warmup(client):
await asyncio.gather(
*(asyncio.create_task(client.exists(f"bench:warmup_{i}")) for i in range(100))
)


@timer
async def run(client, gather):
data_str = "a" * size
data_int = int("1" * size)

if gather is False:
for ret in await asyncio.gather(
asyncio.create_task(set_str(client, gather, data_str)),
asyncio.create_task(set_int(client, gather, data_int)),
asyncio.create_task(hset(client, gather, data_str)),
asyncio.create_task(incr(client, gather)),
asyncio.create_task(lpush(client, gather, data_int)),
):
print(ret)
for ret in await asyncio.gather(
asyncio.create_task(get_str(client, gather)),
asyncio.create_task(get_int(client, gather)),
asyncio.create_task(hget(client, gather)),
asyncio.create_task(lrange_300(client, gather)),
asyncio.create_task(lpop(client, gather)),
):
print(ret)
else:
print(await set_str(client, gather, data_str))
print(await set_int(client, gather, data_int))
print(await hset(client, gather, data_str))
print(await incr(client, gather))
print(await lpush(client, gather, data_int))

print(await get_str(client, gather))
print(await get_int(client, gather))
print(await hget(client, gather))
print(await lrange_300(client, gather))
print(await lpop(client, gather))


async def main(loop, gather=None):
arc = aredis.StrictRedisCluster(
host=host,
port=port,
password=password,
max_connections=2 ** 31,
max_connections_per_node=2 ** 31,
readonly=False,
reinitialize_steps=count,
skip_full_coverage_check=True,
decode_responses=False,
max_idle_time=count,
idle_check_interval=count,
)
print(f"{loop} {gather} {await warmup(arc)} aredis")
print(await run(arc, gather=gather))
arc.connection_pool.disconnect()

aiorc = await aioredis_cluster.create_redis_cluster(
[(host, port)],
password=password,
state_reload_interval=count,
idle_connection_timeout=count,
pool_maxsize=2 ** 31,
)
print(f"{loop} {gather} {await warmup(aiorc)} aioredis-cluster")
print(await run(aiorc, gather=gather))
aiorc.close()
await aiorc.wait_closed()

async with redispy.RedisCluster(
host=host,
port=port,
password=password,
reinitialize_steps=count,
read_from_replicas=False,
decode_responses=False,
max_connections=2 ** 31,
) as rca:
print(f"{loop} {gather} {await warmup(rca)} redispy")
print(await run(rca, gather=gather))


if __name__ == "__main__":
host = "localhost"
port = 16379
password = None

count = 1000
size = 16

asyncio.run(main("asyncio"))
asyncio.run(main("asyncio", gather=False))
asyncio.run(main("asyncio", gather=True))

uvloop.install()

asyncio.run(main("uvloop"))
asyncio.run(main("uvloop", gather=False))
asyncio.run(main("uvloop", gather=True))
9 changes: 7 additions & 2 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"nbsphinx",
"sphinx_gallery.load_style",
"sphinx.ext.autodoc",
"sphinx_autodoc_typehints",
"sphinx.ext.doctest",
"sphinx.ext.viewcode",
"sphinx.ext.autosectionlabel",
Expand All @@ -41,6 +42,10 @@
autosectionlabel_prefix_document = True
autosectionlabel_maxdepth = 2

# AutodocTypehints settings.
always_document_param_types = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome. Had no idea this was an option.

typehints_defaults = "comma"

# Add any paths that contain templates here, relative to this directory.
templates_path = ["_templates"]

Expand Down Expand Up @@ -210,7 +215,7 @@
# (source start file, target name, title, author, documentclass
# [howto/manual]).
latex_documents = [
("index", "redis-py.tex", "redis-py Documentation", "Redis Inc", "manual"),
("index", "redis-py.tex", "redis-py Documentation", "Redis Inc", "manual")
]

# The name of an image file (relative to this directory) to place at the top of
Expand Down Expand Up @@ -258,7 +263,7 @@
"redis-py",
"One line description of project.",
"Miscellaneous",
),
)
]

# Documents to append as an appendix to all manuals.
Expand Down
Loading