Skip to content

Commit

Permalink
CSC Review Fixes - expose delete functions, rename attribute names, a…
Browse files Browse the repository at this point in the history
…dd AbstractCache class (#3110)

* CSC review fixes

* cahnge cache_max_size default value

* use ABC and add docstring
  • Loading branch information
dvora-h authored and vladvildanov committed Sep 27, 2024
1 parent 0b3e7eb commit f70cabb
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 60 deletions.
46 changes: 39 additions & 7 deletions redis/_cache.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import copy
import random
import time
from abc import ABC, abstractmethod
from collections import OrderedDict, defaultdict
from enum import Enum
from typing import List
Expand Down Expand Up @@ -160,7 +161,38 @@ class EvictionPolicy(Enum):
RANDOM = "random"


class _LocalCache:
class AbstractCache(ABC):
"""
An abstract base class for client caching implementations.
If you want to implement your own cache you must support these methods.
"""

@abstractmethod
def set(self, command: str, response: ResponseT, keys_in_command: List[KeyT]):
pass

@abstractmethod
def get(self, command: str) -> ResponseT:
pass

@abstractmethod
def delete_command(self, command: str):
pass

@abstractmethod
def delete_many(self, commands):
pass

@abstractmethod
def flush(self):
pass

@abstractmethod
def invalidate_key(self, key: KeyT):
pass


class _LocalCache(AbstractCache):
"""
A caching mechanism for storing redis commands and their responses.
Expand All @@ -180,7 +212,7 @@ class _LocalCache:

def __init__(
self,
max_size: int = 100,
max_size: int = 10000,
ttl: int = 0,
eviction_policy: EvictionPolicy = DEFAULT_EVICTION_POLICY,
**kwargs,
Expand Down Expand Up @@ -224,12 +256,12 @@ def get(self, command: str) -> ResponseT:
"""
if command in self.cache:
if self._is_expired(command):
self.delete(command)
self.delete_command(command)
return
self._update_access(command)
return copy.deepcopy(self.cache[command]["response"])

def delete(self, command: str):
def delete_command(self, command: str):
"""
Delete a redis command and its metadata from the cache.
Expand Down Expand Up @@ -285,7 +317,7 @@ def _update_access(self, command: str):
def _evict(self):
"""Evict a redis command from the cache based on the eviction policy."""
if self._is_expired(self.commands_ttl_list[0]):
self.delete(self.commands_ttl_list[0])
self.delete_command(self.commands_ttl_list[0])
elif self.eviction_policy == EvictionPolicy.LRU.value:
self.cache.popitem(last=False)
elif self.eviction_policy == EvictionPolicy.LFU.value:
Expand Down Expand Up @@ -319,7 +351,7 @@ def _del_key_commands_map(self, keys: List[KeyT], command: str):
for key in keys:
self.key_commands_map[key].remove(command)

def invalidate(self, key: KeyT):
def invalidate_key(self, key: KeyT):
"""
Invalidate (delete) all redis commands associated with a specific key.
Expand All @@ -330,4 +362,4 @@ def invalidate(self, key: KeyT):
return
commands = list(self.key_commands_map[key])
for command in commands:
self.delete(command)
self.delete_command(command)
39 changes: 33 additions & 6 deletions redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
DEFAULT_BLACKLIST,
DEFAULT_EVICTION_POLICY,
DEFAULT_WHITELIST,
_LocalCache,
AbstractCache,
)
from redis._parsers.helpers import (
_RedisCallbacks,
Expand Down Expand Up @@ -237,11 +237,11 @@ def __init__(
redis_connect_func=None,
credential_provider: Optional[CredentialProvider] = None,
protocol: Optional[int] = 2,
cache_enable: bool = False,
client_cache: Optional[_LocalCache] = None,
cache_enabled: bool = False,
client_cache: Optional[AbstractCache] = None,
cache_max_size: int = 100,
cache_ttl: int = 0,
cache_eviction_policy: str = DEFAULT_EVICTION_POLICY,
cache_policy: str = DEFAULT_EVICTION_POLICY,
cache_blacklist: List[str] = DEFAULT_BLACKLIST,
cache_whitelist: List[str] = DEFAULT_WHITELIST,
):
Expand Down Expand Up @@ -293,11 +293,11 @@ def __init__(
"lib_version": lib_version,
"redis_connect_func": redis_connect_func,
"protocol": protocol,
"cache_enable": cache_enable,
"cache_enabled": cache_enabled,
"client_cache": client_cache,
"cache_max_size": cache_max_size,
"cache_ttl": cache_ttl,
"cache_eviction_policy": cache_eviction_policy,
"cache_policy": cache_policy,
"cache_blacklist": cache_blacklist,
"cache_whitelist": cache_whitelist,
}
Expand Down Expand Up @@ -667,6 +667,33 @@ async def parse_response(
return await retval if inspect.isawaitable(retval) else retval
return response

def flush_cache(self):
try:
if self.connection:
self.connection.client_cache.flush()
else:
self.connection_pool.flush_cache()
except AttributeError:
pass

def delete_command_from_cache(self, command):
try:
if self.connection:
self.connection.client_cache.delete_command(command)
else:
self.connection_pool.delete_command_from_cache(command)
except AttributeError:
pass

def invalidate_key_from_cache(self, key):
try:
if self.connection:
self.connection.client_cache.invalidate_key(key)
else:
self.connection_pool.invalidate_key_from_cache(key)
except AttributeError:
pass


StrictRedis = Redis

Expand Down
12 changes: 6 additions & 6 deletions redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
DEFAULT_BLACKLIST,
DEFAULT_EVICTION_POLICY,
DEFAULT_WHITELIST,
_LocalCache,
AbstractCache,
)
from redis._parsers import AsyncCommandsParser, Encoder
from redis._parsers.helpers import (
Expand Down Expand Up @@ -273,11 +273,11 @@ def __init__(
ssl_keyfile: Optional[str] = None,
protocol: Optional[int] = 2,
address_remap: Optional[Callable[[str, int], Tuple[str, int]]] = None,
cache_enable: bool = False,
client_cache: Optional[_LocalCache] = None,
cache_enabled: bool = False,
client_cache: Optional[AbstractCache] = None,
cache_max_size: int = 100,
cache_ttl: int = 0,
cache_eviction_policy: str = DEFAULT_EVICTION_POLICY,
cache_policy: str = DEFAULT_EVICTION_POLICY,
cache_blacklist: List[str] = DEFAULT_BLACKLIST,
cache_whitelist: List[str] = DEFAULT_WHITELIST,
) -> None:
Expand Down Expand Up @@ -324,11 +324,11 @@ def __init__(
"retry": retry,
"protocol": protocol,
# Client cache related kwargs
"cache_enable": cache_enable,
"cache_enabled": cache_enabled,
"client_cache": client_cache,
"cache_max_size": cache_max_size,
"cache_ttl": cache_ttl,
"cache_eviction_policy": cache_eviction_policy,
"cache_policy": cache_policy,
"cache_blacklist": cache_blacklist,
"cache_whitelist": cache_whitelist,
}
Expand Down
54 changes: 38 additions & 16 deletions redis/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
DEFAULT_BLACKLIST,
DEFAULT_EVICTION_POLICY,
DEFAULT_WHITELIST,
AbstractCache,
_LocalCache,
)
from .._parsers import (
Expand Down Expand Up @@ -156,11 +157,11 @@ def __init__(
encoder_class: Type[Encoder] = Encoder,
credential_provider: Optional[CredentialProvider] = None,
protocol: Optional[int] = 2,
cache_enable: bool = False,
client_cache: Optional[_LocalCache] = None,
cache_max_size: int = 100,
cache_enabled: bool = False,
client_cache: Optional[AbstractCache] = None,
cache_max_size: int = 10000,
cache_ttl: int = 0,
cache_eviction_policy: str = DEFAULT_EVICTION_POLICY,
cache_policy: str = DEFAULT_EVICTION_POLICY,
cache_blacklist: List[str] = DEFAULT_BLACKLIST,
cache_whitelist: List[str] = DEFAULT_WHITELIST,
):
Expand Down Expand Up @@ -220,8 +221,8 @@ def __init__(
if p < 2 or p > 3:
raise ConnectionError("protocol must be either 2 or 3")
self.protocol = protocol
if cache_enable:
_cache = _LocalCache(cache_max_size, cache_ttl, cache_eviction_policy)
if cache_enabled:
_cache = _LocalCache(cache_max_size, cache_ttl, cache_policy)
else:
_cache = None
self.client_cache = client_cache if client_cache is not None else _cache
Expand Down Expand Up @@ -698,7 +699,7 @@ def _cache_invalidation_process(
self.client_cache.flush()
else:
for key in data[1]:
self.client_cache.invalidate(str_if_bytes(key))
self.client_cache.invalidate_key(str_if_bytes(key))

async def _get_from_local_cache(self, command: str):
"""
Expand Down Expand Up @@ -728,15 +729,6 @@ def _add_to_local_cache(
):
self.client_cache.set(command, response, keys)

def delete_from_local_cache(self, command: str):
"""
Delete the command from the local cache
"""
try:
self.client_cache.delete(command)
except AttributeError:
pass


class Connection(AbstractConnection):
"Manages TCP communication to and from a Redis server"
Expand Down Expand Up @@ -1240,6 +1232,36 @@ def set_retry(self, retry: "Retry") -> None:
for conn in self._in_use_connections:
conn.retry = retry

def flush_cache(self):
connections = chain(self._available_connections, self._in_use_connections)

for connection in connections:
try:
connection.client_cache.flush()
except AttributeError:
# cache is not enabled
pass

def delete_command_from_cache(self, command: str):
connections = chain(self._available_connections, self._in_use_connections)

for connection in connections:
try:
connection.client_cache.delete_command(command)
except AttributeError:
# cache is not enabled
pass

def invalidate_key_from_cache(self, key: str):
connections = chain(self._available_connections, self._in_use_connections)

for connection in connections:
try:
connection.client_cache.invalidate_key(key)
except AttributeError:
# cache is not enabled
pass


class BlockingConnectionPool(ConnectionPool):
"""
Expand Down
41 changes: 34 additions & 7 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
DEFAULT_BLACKLIST,
DEFAULT_EVICTION_POLICY,
DEFAULT_WHITELIST,
_LocalCache,
AbstractCache,
)
from redis._parsers.encoders import Encoder
from redis._parsers.helpers import (
Expand Down Expand Up @@ -209,11 +209,11 @@ def __init__(
redis_connect_func=None,
credential_provider: Optional[CredentialProvider] = None,
protocol: Optional[int] = 2,
cache_enable: bool = False,
client_cache: Optional[_LocalCache] = None,
cache_max_size: int = 100,
cache_enabled: bool = False,
client_cache: Optional[AbstractCache] = None,
cache_max_size: int = 10000,
cache_ttl: int = 0,
cache_eviction_policy: str = DEFAULT_EVICTION_POLICY,
cache_policy: str = DEFAULT_EVICTION_POLICY,
cache_blacklist: List[str] = DEFAULT_BLACKLIST,
cache_whitelist: List[str] = DEFAULT_WHITELIST,
) -> None:
Expand Down Expand Up @@ -267,11 +267,11 @@ def __init__(
"redis_connect_func": redis_connect_func,
"credential_provider": credential_provider,
"protocol": protocol,
"cache_enable": cache_enable,
"cache_enabled": cache_enabled,
"client_cache": client_cache,
"cache_max_size": cache_max_size,
"cache_ttl": cache_ttl,
"cache_eviction_policy": cache_eviction_policy,
"cache_policy": cache_policy,
"cache_blacklist": cache_blacklist,
"cache_whitelist": cache_whitelist,
}
Expand Down Expand Up @@ -589,6 +589,33 @@ def parse_response(self, connection, command_name, **options):
return self.response_callbacks[command_name](response, **options)
return response

def flush_cache(self):
try:
if self.connection:
self.connection.client_cache.flush()
else:
self.connection_pool.flush_cache()
except AttributeError:
pass

def delete_command_from_cache(self, command):
try:
if self.connection:
self.connection.client_cache.delete_command(command)
else:
self.connection_pool.delete_command_from_cache(command)
except AttributeError:
pass

def invalidate_key_from_cache(self, key):
try:
if self.connection:
self.connection.client_cache.invalidate_key(key)
else:
self.connection_pool.invalidate_key_from_cache(key)
except AttributeError:
pass


StrictRedis = Redis

Expand Down
4 changes: 2 additions & 2 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,11 @@ def parse_cluster_myshardid(resp, **options):
"ssl_password",
"unix_socket_path",
"username",
"cache_enable",
"cache_enabled",
"client_cache",
"cache_max_size",
"cache_ttl",
"cache_eviction_policy",
"cache_policy",
"cache_blacklist",
"cache_whitelist",
)
Expand Down
Loading

0 comments on commit f70cabb

Please sign in to comment.