Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RESP3 response-callbacks cleanup #2841

Merged
merged 9 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
Union,
)

from redis._parsers import AsyncCommandsParser, Encoder
from redis.asyncio.client import ResponseCallbackT
from redis.asyncio.connection import Connection, DefaultParser, SSLConnection, parse_url
from redis.asyncio.lock import Lock
Expand Down Expand Up @@ -55,7 +56,6 @@
TimeoutError,
TryAgainError,
)
from redis.parsers import AsyncCommandsParser, Encoder
from redis.typing import AnyKeyT, EncodableT, KeyT
from redis.utils import dict_merge, safe_str, str_if_bytes

Expand Down
2 changes: 1 addition & 1 deletion redis/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from redis.typing import EncodableT
from redis.utils import HIREDIS_AVAILABLE, str_if_bytes

from ..parsers import (
from .._parsers import (
dvora-h marked this conversation as resolved.
Show resolved Hide resolved
BaseParser,
Encoder,
_AsyncHiredisParser,
Expand Down
256 changes: 108 additions & 148 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from redis.exceptions import (
ConnectionError,
ExecAbortError,
ModuleError,
PubSubError,
RedisError,
ResponseError,
Expand Down Expand Up @@ -96,13 +95,6 @@ def parse_debug_object(response):
return response


def parse_object(response, infotype):
"""Parse the results of an OBJECT command"""
if infotype in ("idletime", "refcount"):
return int_or_none(response)
return response


def parse_info(response):
"""Parse the result of Redis's INFO command into a Python dict"""
info = {}
Expand Down Expand Up @@ -278,12 +270,6 @@ def sort_return_tuples(response, **options):
return list(zip(*[response[i::n] for i in range(n)]))


def int_or_none(response):
if response is None:
return None
return int(response)


def parse_stream_list(response):
if response is None:
return None
Expand Down Expand Up @@ -711,12 +697,6 @@ def parse_client_info(value):
return client_info


def parse_module_result(response):
if isinstance(response, ModuleError):
raise response
return True


def parse_set_result(response, **options):
"""
Handle SET result since GET argument is available since Redis 6.2.
Expand All @@ -733,158 +713,138 @@ def parse_set_result(response, **options):

class AbstractRedis:
RESPONSE_CALLBACKS = {
**string_keys_to_dict("EXPIRE EXPIREAT PEXPIRE PEXPIREAT AUTH", bool),
**string_keys_to_dict("EXISTS", int),
**string_keys_to_dict("INCRBYFLOAT HINCRBYFLOAT", float),
**string_keys_to_dict("READONLY MSET", bool_ok),
"CLUSTER DELSLOTS": bool_ok,
"CLUSTER ADDSLOTS": bool_ok,
"COMMAND": parse_command,
"INFO": parse_info,
"SET": parse_set_result,
"CLIENT ID": int,
**string_keys_to_dict(
"AUTH COPY EXPIRE EXPIREAT HEXISTS HMSET MOVE MSETNX PERSIST PSETEX "
"PEXPIRE PEXPIREAT RENAMENX SETEX SETNX SMOVE",
bool,
),
**string_keys_to_dict("HINCRBYFLOAT INCRBYFLOAT", float),
**string_keys_to_dict(
"ASKING FLUSHALL FLUSHDB LSET LTRIM MSET PFMERGE READONLY READWRITE "
"RENAME SAVE SELECT SHUTDOWN SLAVEOF SWAPDB WATCH UNWATCH",
bool_ok,
),
**string_keys_to_dict("XREAD XREADGROUP", parse_xread),
**string_keys_to_dict(
"GEORADIUS GEORADIUSBYMEMBER GEOSEARCH", parse_geosearch_generic
),
**string_keys_to_dict("XRANGE XREVRANGE", parse_stream_list),
"ACL GETUSER": parse_acl_getuser,
"ACL LOAD": bool_ok,
"ACL LOG": parse_acl_log,
"ACL SETUSER": bool_ok,
"ACL SAVE": bool_ok,
"CLIENT INFO": parse_client_info,
"CLIENT KILL": parse_client_kill,
"CLIENT LIST": parse_client_list,
"CLIENT INFO": parse_client_info,
"CLIENT PAUSE": bool_ok,
"CLIENT SETNAME": bool_ok,
"CLIENT TRACKINGINFO": lambda r: list(map(str_if_bytes, r)),
"LASTSAVE": timestamp_to_datetime,
"RESET": str_if_bytes,
"SLOWLOG GET": parse_slowlog_get,
"TIME": lambda x: (int(x[0]), int(x[1])),
**string_keys_to_dict("BLPOP BRPOP", lambda r: r and tuple(r) or None),
"SCAN": parse_scan,
"CLIENT GETNAME": str_if_bytes,
"SSCAN": parse_scan,
"ACL LOG": parse_acl_log,
"ACL WHOAMI": str_if_bytes,
"ACL GENPASS": str_if_bytes,
"ACL CAT": lambda r: list(map(str_if_bytes, r)),
"HSCAN": parse_hscan,
"ZSCAN": parse_zscan,
**string_keys_to_dict(
"BZPOPMIN BZPOPMAX", lambda r: r and (r[0], r[1], float(r[2])) or None
),
"CLUSTER COUNT-FAILURE-REPORTS": lambda x: int(x),
"CLUSTER COUNTKEYSINSLOT": lambda x: int(x),
"CLIENT UNBLOCK": bool,
"CLUSTER ADDSLOTS": bool_ok,
"CLUSTER ADDSLOTSRANGE": bool_ok,
"CLUSTER DELSLOTS": bool_ok,
"CLUSTER DELSLOTSRANGE": bool_ok,
"CLUSTER FAILOVER": bool_ok,
"CLUSTER FORGET": bool_ok,
"CLUSTER INFO": parse_cluster_info,
"CLUSTER KEYSLOT": lambda x: int(x),
"CLUSTER MEET": bool_ok,
"CLUSTER NODES": parse_cluster_nodes,
"CLUSTER REPLICAS": parse_cluster_nodes,
"CLUSTER REPLICATE": bool_ok,
"CLUSTER RESET": bool_ok,
"CLUSTER SAVECONFIG": bool_ok,
"CLUSTER SET-CONFIG-EPOCH": bool_ok,
"CLUSTER SETSLOT": bool_ok,
"CLUSTER SLAVES": parse_cluster_nodes,
**string_keys_to_dict("GEODIST", float_or_none),
"GEOHASH": lambda r: list(map(str_if_bytes, r)),
"GEOPOS": lambda r: list(
map(lambda ll: (float(ll[0]), float(ll[1])) if ll is not None else None, r)
),
"GEOSEARCH": parse_geosearch_generic,
"GEORADIUS": parse_geosearch_generic,
"GEORADIUSBYMEMBER": parse_geosearch_generic,
"XAUTOCLAIM": parse_xautoclaim,
"XINFO STREAM": parse_xinfo_stream,
"XPENDING": parse_xpending,
**string_keys_to_dict("XREAD XREADGROUP", parse_xread),
"COMMAND GETKEYS": lambda r: list(map(str_if_bytes, r)),
**string_keys_to_dict("SORT", sort_return_tuples),
"COMMAND": parse_command,
"CONFIG RESETSTAT": bool_ok,
"CONFIG SET": bool_ok,
"FUNCTION DELETE": bool_ok,
"FUNCTION FLUSH": bool_ok,
"FUNCTION RESTORE": bool_ok,
"GEODIST": float_or_none,
"HSCAN": parse_hscan,
"INFO": parse_info,
"LASTSAVE": timestamp_to_datetime,
"MEMORY PURGE": bool_ok,
"MODULE LOAD": bool,
"MODULE UNLOAD": bool,
"PING": lambda r: str_if_bytes(r) == "PONG",
"ACL SETUSER": bool_ok,
"PUBSUB NUMSUB": parse_pubsub_numsub,
"QUIT": bool_ok,
"SET": parse_set_result,
"SCAN": parse_scan,
"SCRIPT EXISTS": lambda r: list(map(bool, r)),
"SCRIPT FLUSH": bool_ok,
"SCRIPT KILL": bool_ok,
"SCRIPT LOAD": str_if_bytes,
"ACL GETUSER": parse_acl_getuser,
"CONFIG SET": bool_ok,
**string_keys_to_dict("XREVRANGE XRANGE", parse_stream_list),
"SENTINEL CKQUORUM": bool_ok,
"SENTINEL FAILOVER": bool_ok,
"SENTINEL FLUSHCONFIG": bool_ok,
"SENTINEL GET-MASTER-ADDR-BY-NAME": parse_sentinel_get_master,
"SENTINEL MONITOR": bool_ok,
"SENTINEL RESET": bool_ok,
"SENTINEL REMOVE": bool_ok,
"SENTINEL SET": bool_ok,
"SLOWLOG GET": parse_slowlog_get,
"SLOWLOG RESET": bool_ok,
"SORT": sort_return_tuples,
"SSCAN": parse_scan,
"TIME": lambda x: (int(x[0]), int(x[1])),
"XAUTOCLAIM": parse_xautoclaim,
"XCLAIM": parse_xclaim,
"CLUSTER SET-CONFIG-EPOCH": bool_ok,
"CLUSTER REPLICAS": parse_cluster_nodes,
"ACL LIST": lambda r: list(map(str_if_bytes, r)),
"XGROUP CREATE": bool_ok,
"XGROUP DESTROY": bool,
"XGROUP SETID": bool_ok,
"XINFO STREAM": parse_xinfo_stream,
"XPENDING": parse_xpending,
"ZSCAN": parse_zscan,
}

RESP2_RESPONSE_CALLBACKS = {
"CONFIG GET": parse_config_get,
**string_keys_to_dict(
"SDIFF SINTER SMEMBERS SUNION", lambda r: r and set(r) or set()
),
**string_keys_to_dict("READWRITE", bool_ok),
**string_keys_to_dict(
"ZPOPMAX ZPOPMIN ZINTER ZDIFF ZUNION ZRANGE ZRANGEBYSCORE "
"ZREVRANGE ZREVRANGEBYSCORE",
"ZDIFF ZINTER ZPOPMAX ZPOPMIN ZRANGE ZRANGEBYSCORE ZRANK ZREVRANGE "
"ZREVRANGEBYSCORE ZREVRANK ZUNION",
zset_score_pairs,
),
**string_keys_to_dict("ZSCORE ZINCRBY", float_or_none),
"ZADD": parse_zadd,
"ZMSCORE": parse_zmscore,
**string_keys_to_dict("ZINCRBY ZSCORE", float_or_none),
**string_keys_to_dict("BGREWRITEAOF BGSAVE", lambda r: True),
**string_keys_to_dict("BLPOP BRPOP", lambda r: r and tuple(r) or None),
**string_keys_to_dict(
"BZPOPMAX BZPOPMIN", lambda r: r and (r[0], r[1], float(r[2])) or None
),
"ACL CAT": lambda r: list(map(str_if_bytes, r)),
"ACL GENPASS": str_if_bytes,
"ACL HELP": lambda r: list(map(str_if_bytes, r)),
"ACL LIST": lambda r: list(map(str_if_bytes, r)),
"ACL USERS": lambda r: list(map(str_if_bytes, r)),
"ACL WHOAMI": str_if_bytes,
"CLIENT GETNAME": str_if_bytes,
"CLIENT TRACKINGINFO": lambda r: list(map(str_if_bytes, r)),
"CLUSTER GETKEYSINSLOT": lambda r: list(map(str_if_bytes, r)),
"COMMAND GETKEYS": lambda r: list(map(str_if_bytes, r)),
"CONFIG GET": parse_config_get,
"DEBUG OBJECT": parse_debug_object,
"GEOHASH": lambda r: list(map(str_if_bytes, r)),
"GEOPOS": lambda r: list(
map(lambda ll: (float(ll[0]), float(ll[1])) if ll is not None else None, r)
),
"HGETALL": lambda r: r and pairs_to_dict(r) or {},
"MEMORY STATS": parse_memory_stats,
"MODULE LIST": lambda r: [pairs_to_dict(m) for m in r],
"RESET": str_if_bytes,
"STRALGO": parse_stralgo,
# **string_keys_to_dict(
# "COPY "
# "HEXISTS HMSET MOVE MSETNX PERSIST "
# "PSETEX RENAMENX SMOVE SETEX SETNX",
# bool,
# ),
# **string_keys_to_dict(
# "HSTRLEN INCRBY LINSERT LLEN LPUSHX PFADD PFCOUNT RPUSHX SADD "
# "SCARD SDIFFSTORE SETBIT SETRANGE SINTERSTORE SREM STRLEN "
# "SUNIONSTORE UNLINK XACK XDEL XLEN XTRIM ZCARD ZLEXCOUNT ZREM "
# "ZREMRANGEBYLEX ZREMRANGEBYRANK ZREMRANGEBYSCORE",
# int,
# ),
# **string_keys_to_dict(
# "FLUSHALL FLUSHDB LSET LTRIM PFMERGE ASKING "
# "RENAME SAVE SELECT SHUTDOWN SLAVEOF SWAPDB WATCH UNWATCH ",
# bool_ok,
# ),
# **string_keys_to_dict("ZRANK ZREVRANK", int_or_none),
# **string_keys_to_dict("BGREWRITEAOF BGSAVE", lambda r: True),
# "ACL HELP": lambda r: list(map(str_if_bytes, r)),
# "ACL LOAD": bool_ok,
# "ACL SAVE": bool_ok,
# "ACL USERS": lambda r: list(map(str_if_bytes, r)),
# "CLIENT UNBLOCK": lambda r: r and int(r) == 1 or False,
# "CLIENT PAUSE": bool_ok,
# "CLUSTER ADDSLOTSRANGE": bool_ok,
# "CLUSTER DELSLOTSRANGE": bool_ok,
# "CLUSTER GETKEYSINSLOT": lambda r: list(map(str_if_bytes, r)),
# "CONFIG RESETSTAT": bool_ok,
# "DEBUG OBJECT": parse_debug_object,
# "FUNCTION DELETE": bool_ok,
# "FUNCTION FLUSH": bool_ok,
# "FUNCTION RESTORE": bool_ok,
# "MEMORY PURGE": bool_ok,
# "MEMORY USAGE": int_or_none,
# "MODULE LOAD": parse_module_result,
# "MODULE UNLOAD": parse_module_result,
# "OBJECT": parse_object,
# "QUIT": bool_ok,
# "RANDOMKEY": lambda r: r and r or None,
# "SCRIPT EXISTS": lambda r: list(map(bool, r)),
# "SCRIPT KILL": bool_ok,
# "SENTINEL CKQUORUM": bool_ok,
# "SENTINEL FAILOVER": bool_ok,
# "SENTINEL FLUSHCONFIG": bool_ok,
# "SENTINEL GET-MASTER-ADDR-BY-NAME": parse_sentinel_get_master,
"XINFO CONSUMERS": parse_list_of_dicts,
"XINFO GROUPS": parse_list_of_dicts,
"ZADD": parse_zadd,
"ZMSCORE": parse_zmscore,
# "SENTINEL MASTER": parse_sentinel_master,
# "SENTINEL MASTERS": parse_sentinel_masters,
# "SENTINEL MONITOR": bool_ok,
# "SENTINEL RESET": bool_ok,
# "SENTINEL REMOVE": bool_ok,
# "SENTINEL SENTINELS": parse_sentinel_slaves_and_sentinels,
# "SENTINEL SET": bool_ok,
# "SENTINEL SLAVES": parse_sentinel_slaves_and_sentinels,
# "SLOWLOG RESET": bool_ok,
# "XGROUP CREATE": bool_ok,
# "XGROUP DESTROY": bool,
# "XGROUP SETID": bool_ok,
"XINFO CONSUMERS": parse_list_of_dicts,
"XINFO GROUPS": parse_list_of_dicts,
}

RESP3_RESPONSE_CALLBACKS = {
Expand All @@ -893,6 +853,14 @@ class AbstractRedis:
"ZUNION HGETALL XREADGROUP",
lambda r, **kwargs: r,
),
**string_keys_to_dict("XREAD XREADGROUP", parse_xread_resp3),
"ACL LOG": lambda r: [
{str_if_bytes(key): str_if_bytes(value) for key, value in x.items()}
for x in r
]
if isinstance(r, list)
else bool_ok(r),
"COMMAND": parse_command_resp3,
"CONFIG GET": lambda r: {
str_if_bytes(key)
if key is not None
Expand All @@ -901,14 +869,9 @@ class AbstractRedis:
else None
for key, value in r.items()
},
"ACL LOG": lambda r: [
{str_if_bytes(key): str_if_bytes(value) for key, value in x.items()}
for x in r
]
if isinstance(r, list)
else bool_ok(r),
**string_keys_to_dict("XREAD XREADGROUP", parse_xread_resp3),
"COMMAND": parse_command_resp3,
"MEMORY STATS": lambda r: {
str_if_bytes(key): value for key, value in r.items()
},
"STRALGO": lambda r, **options: {
str_if_bytes(key): str_if_bytes(value) for key, value in r.items()
}
Expand All @@ -917,9 +880,6 @@ class AbstractRedis:
"XINFO CONSUMERS": lambda r: [
{str_if_bytes(key): value for key, value in x.items()} for x in r
],
"MEMORY STATS": lambda r: {
str_if_bytes(key): value for key, value in r.items()
},
"XINFO GROUPS": lambda r: [
{str_if_bytes(key): value for key, value in d.items()} for d in r
],
Expand Down
2 changes: 1 addition & 1 deletion redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from collections import OrderedDict
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

from redis._parsers import CommandsParser, Encoder
from redis.backoff import default_backoff
from redis.client import CaseInsensitiveDict, PubSub, Redis, parse_scan
from redis.commands import READ_COMMANDS, RedisClusterCommands
Expand All @@ -30,7 +31,6 @@
TryAgainError,
)
from redis.lock import Lock
from redis.parsers import CommandsParser, Encoder
from redis.retry import Retry
from redis.utils import (
HIREDIS_AVAILABLE,
Expand Down
Loading
Loading