-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathdata_store.py
100 lines (84 loc) · 3.15 KB
/
data_store.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
"""Module to handle data storage in Redis"""
import logging
import redis
class DataStore:
"""Class to handle data storage in Redis"""
## pylint: disable=too-many-arguments
def __init__(
self, instance_id=None, logger=None
):
"""
Initialize the DataStore with Redis connection parameters and instance ID.
:param host: Redis server hostname.
:param port: Redis server port.
:param db: Redis database number.
:param instance_id: Unique identifier for the instance.
:param logger: Custom logger instance.
"""
self.instance_id = instance_id
self.logger = logger if logger else logging.getLogger(__name__)
try:
self.r = redis.Redis()
self.r.ping() # Test the connection
except redis.ConnectionError as e:
self.logger.error("Failed to connect to Redis: %s", e)
raise
def __enter__(self):
"""
Return the instance of the class to be used in a with statement.
"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Close the connection to Redis.
"""
try:
if self.r:
self.r.close()
except redis.RedisError as e:
self.logger.error("Failed to close Redis connection: %s", e)
def _get_cache_key(self, key, instance_id=None):
"""
Get the cache key to use for the Redis instance.
:param key: Key to use in the cache.
:return: The cache key to use.
"""
return f"{instance_id}_{key}" if instance_id else f"{self.instance_id}_{key}"
def set_param(self, key, value, instance_id=None):
"""
Add a key-value pair to Redis.
:param key: Key to add.
:param value: Value to associate with the key.
"""
try:
self.r.set(self._get_cache_key(key, instance_id), value)
except redis.RedisError as e:
self.logger.error("Failed to set key %s in Redis: %s", key, e)
raise
def retrieve_param(self, key, instance_id=None, cast_to=float):
"""
Retrieve a value from Redis using the key.
:param key: Key to retrieve.
:param cast_to: Type to cast the retrieved value to.
:return: The value cast to the specified type or None if the key does not exist.
"""
try:
value = self.r.get(self._get_cache_key(key, instance_id))
if value is None:
return None
value = value.decode("utf-8") # Decode the byte value to string
return cast_to(value)
except (redis.RedisError, ValueError) as e:
self.logger.error("Failed to get or cast key %s in Redis: %s", key, e)
raise
def get_keys(self, instance_id=None):
"""
Get all keys stored in Redis.
:return: List of keys stored in Redis.
"""
try:
keys = self.r.keys(f"{instance_id}*")
return [key.decode("utf-8") for key in keys]
except redis.RedisError as e:
self.logger.error("Failed to get keys in Redis: %s", e)
raise