diff --git a/src/swsssdk/dbconnector.py b/src/swsssdk/dbconnector.py index ee7f23f3b64a..d946061eeebb 100644 --- a/src/swsssdk/dbconnector.py +++ b/src/swsssdk/dbconnector.py @@ -247,11 +247,10 @@ def connect(self, db_name, retry_on=True): self.dbintf.redis_kwargs["port"] = self.get_db_port(db_name) self.dbintf.redis_kwargs["unix_socket_path"] = None db_id = self.get_dbid(db_name) - self.dbintf.connect(db_id, retry_on) + self.dbintf.connect(db_id, db_name, retry_on) def close(self, db_name): - db_id = self.get_dbid(db_name) - self.dbintf.close(db_id) + self.dbintf.close(db_name) def get_db_list(self): return SonicDBConfig.get_dblist(self.namespace) @@ -275,43 +274,33 @@ def get_db_separator(self, db_name): return SonicDBConfig.get_separator(db_name, self.namespace) def get_redis_client(self, db_name): - db_id = self.get_dbid(db_name) - return self.dbintf.get_redis_client(db_id) + return self.dbintf.get_redis_client(db_name) def publish(self, db_name, channel, message): - db_id = self.get_dbid(db_name) - return self.dbintf.publish(db_id, channel, message) + return self.dbintf.publish(db_name, channel, message) def expire(self, db_name, key, timeout_sec): - db_id = self.get_dbid(db_name) - return self.dbintf.expire(db_id, key, timeout_sec) + return self.dbintf.expire(db_name, key, timeout_sec) def exists(self, db_name, key): - db_id = self.get_dbid(db_name) - return self.dbintf.exists(db_id, key) + return self.dbintf.exists(db_name, key) def keys(self, db_name, pattern='*', *args, **kwargs): - db_id = self.get_dbid(db_name) - return self.dbintf.keys(db_id, pattern, *args, **kwargs) + return self.dbintf.keys(db_name, pattern, *args, **kwargs) def get(self, db_name, _hash, key, *args, **kwargs): - db_id = self.get_dbid(db_name) - return self.dbintf.get(db_id, _hash, key, *args, **kwargs) + return self.dbintf.get(db_name, _hash, key, *args, **kwargs) def get_all(self, db_name, _hash, *args, **kwargs): - db_id = self.get_dbid(db_name) - return self.dbintf.get_all(db_id, _hash, *args, **kwargs) + return self.dbintf.get_all(db_name, _hash, *args, **kwargs) def set(self, db_name, _hash, key, val, *args, **kwargs): - db_id = self.get_dbid(db_name) - return self.dbintf.set(db_id, _hash, key, val, *args, **kwargs) + return self.dbintf.set(db_name, _hash, key, val, *args, **kwargs) def delete(self, db_name, key, *args, **kwargs): - db_id = self.get_dbid(db_name) - return self.dbintf.delete(db_id, key, *args, **kwargs) + return self.dbintf.delete(db_name, key, *args, **kwargs) def delete_all_by_pattern(self, db_name, pattern, *args, **kwargs): - db_id = self.get_dbid(db_name) - self.dbintf.delete_all_by_pattern(db_id, pattern, *args, **kwargs) + self.dbintf.delete_all_by_pattern(db_name, pattern, *args, **kwargs) pass diff --git a/src/swsssdk/interface.py b/src/swsssdk/interface.py index a3a13cb1b17a..005d806fe0bf 100644 --- a/src/swsssdk/interface.py +++ b/src/swsssdk/interface.py @@ -18,7 +18,7 @@ def blockable(f): class SonicV2Connector: @blockable - def keys(self, db_id): + def keys(self, db_name): # ... # call with: @@ -29,26 +29,26 @@ def keys(self, db_id): """ @wraps(f) - def wrapped(inst, db_id, *args, **kwargs): + def wrapped(inst, db_name, *args, **kwargs): blocking = kwargs.pop('blocking', False) attempts = 0 while True: try: - ret_data = f(inst, db_id, *args, **kwargs) - inst._unsubscribe_keyspace_notification(db_id) + ret_data = f(inst, db_name, *args, **kwargs) + inst._unsubscribe_keyspace_notification(db_name) return ret_data except UnavailableDataError as e: if blocking: - if db_id in inst.keyspace_notification_channels: - result = inst._unavailable_data_handler(db_id, e.data) + if db_name in inst.keyspace_notification_channels: + result = inst._unavailable_data_handler(db_name, e.data) if result: continue # received updates, try to read data again else: - inst._unsubscribe_keyspace_notification(db_id) + inst._unsubscribe_keyspace_notification(db_name) raise # No updates was received. Raise exception else: # Subscribe to updates and try it again (avoiding race condition) - inst._subscribe_keyspace_notification(db_id) + inst._subscribe_keyspace_notification(db_name) else: return None except redis.exceptions.ResponseError: @@ -57,12 +57,12 @@ def wrapped(inst, db_id, *args, **kwargs): Retrying the request won't pass unless the schema itself changes. In this case, the error should be attributed to the application itself. Re-raise the error. """ - logger.exception("Bad DB request [{}:{}]{{ {} }}".format(db_id, f.__name__, str(args))) + logger.exception("Bad DB request [{}:{}]{{ {} }}".format(db_name, f.__name__, str(args))) raise except (redis.exceptions.RedisError, OSError): attempts += 1 - inst._connection_error_handler(db_id) - msg = "DB access failure by [{}:{}]{{ {} }}".format(db_id, f.__name__, str(args)) + inst._connection_error_handler(db_name) + msg = "DB access failure by [{}:{}]{{ {} }}".format(db_name, f.__name__, str(args)) if BLOCKING_ATTEMPT_ERROR_THRESHOLD < attempts < BLOCKING_ATTEMPT_SUPPRESSION: # Repeated access failures implies the database itself is unhealthy. logger.exception(msg=msg) @@ -75,7 +75,7 @@ def wrapped(inst, db_id, *args, **kwargs): class DBRegistry(dict): def __getitem__(self, item): if item not in self: - raise MissingClientError("No client connected for db_id '{}'".format(item)) + raise MissingClientError("No client connected for db_name '{}'".format(item)) return dict.__getitem__(self, item) @@ -159,128 +159,133 @@ def __init__(self, **kwargs): # notifications for each client self.keyspace_notification_channels = DBRegistry() - def connect(self, db_id, retry_on=True): + def connect(self, db_id, db_name, retry_on=True): """ :param db_id: database id to connect to + :param db_name: database name to connect to :param retry_on: if ``True`` -- will attempt to connect continuously. if ``False``, only one attempt will be made. """ if retry_on: - self._persistent_connect(db_id) + self._persistent_connect(db_id, db_name) else: - self._onetime_connect(db_id) + self._onetime_connect(db_id, db_name) - def _onetime_connect(self, db_id): + def _onetime_connect(self, db_id, db_name): """ Connect to database id. """ if db_id is None: raise ValueError("No database ID configured for '{}'".format(db_id)) - client = redis.StrictRedis(db=db_id, **self.redis_kwargs) + if db_name is None: + raise ValueError("No database Name configured for '{}'".format(db_name)) - # Enable the notification mechanism for keyspace events in Redis - client.config_set('notify-keyspace-events', self.KEYSPACE_EVENTS) - self.redis_clients[db_id] = client + if db_name not in self.redis_clients.keys(): + client = redis.StrictRedis(db=db_id, **self.redis_kwargs) - def _persistent_connect(self, db_id): + # Enable the notification mechanism for keyspace events in Redis + client.config_set('notify-keyspace-events', self.KEYSPACE_EVENTS) + self.redis_clients[db_name] = client + + def _persistent_connect(self, db_id, db_name): """ Keep reconnecting to Database 'db_id' until success """ while True: try: - self._onetime_connect(db_id) + self._onetime_connect(db_id, db_name) return except RedisError: t_wait = self.CONNECT_RETRY_WAIT_TIME - logger.warning("Connecting to DB '{}' failed, will retry in {}s".format(db_id, t_wait)) - self.close(db_id) + logger.warning("Connecting to DB '{} {}' failed, will retry in {}s".format(db_id, db_name, t_wait)) + self.close(db_name) time.sleep(t_wait) - def close(self, db_id): + def close(self, db_name): """ Close all client(s) / keyspace channels. - :param db_id: DB to disconnect from. + :param db_name: DB to disconnect from. """ - if db_id in self.redis_clients: - self.redis_clients[db_id].connection_pool.disconnect() - if db_id in self.keyspace_notification_channels: - self.keyspace_notification_channels[db_id].close() + if db_name in self.redis_clients: + self.redis_clients[db_name].connection_pool.disconnect() + if db_name in self.keyspace_notification_channels: + self.keyspace_notification_channels[db_name].close() - def _subscribe_keyspace_notification(self, db_id): + def _subscribe_keyspace_notification(self, db_name): """ Subscribe the chosent client to keyspace event notifications """ logger.debug("Subscribe to keyspace notification") - client = self.redis_clients[db_id] + client = self.redis_clients[db_name] pubsub = client.pubsub() pubsub.psubscribe(self.KEYSPACE_PATTERN) - self.keyspace_notification_channels[db_id] = pubsub + self.keyspace_notification_channels[db_name] = pubsub - def _unsubscribe_keyspace_notification(self, db_id): + def _unsubscribe_keyspace_notification(self, db_name): """ Unsubscribe the chosent client from keyspace event notifications """ - if db_id in self.keyspace_notification_channels: + if db_name in self.keyspace_notification_channels: logger.debug("Unsubscribe from keyspace notification") - self.keyspace_notification_channels[db_id].close() - del self.keyspace_notification_channels[db_id] + self.keyspace_notification_channels[db_name].close() + del self.keyspace_notification_channels[db_name] - def get_redis_client(self, db_id): + def get_redis_client(self, db_name): """ - :param db_id: Name of the DB to query + :param db_name: Name of the DB to query :return: The Redis client instance. """ - return self.redis_clients[db_id] + return self.redis_clients[db_name] - def publish(self, db_id, channel, message): + def publish(self, db_name, channel, message): """ Publish message via the channel """ - client = self.redis_clients[db_id] + client = self.redis_clients[db_name] return client.publish(channel, message) - def expire(self, db_id, key, timeout_sec): + def expire(self, db_name, key, timeout_sec): """ Set a timeout on a key """ - client = self.redis_clients[db_id] + client = self.redis_clients[db_name] return client.expire(key, timeout_sec) - def exists(self, db_id, key): + def exists(self, db_name, key): """ Check if a key exist in the db """ - client = self.redis_clients[db_id] + client = self.redis_clients[db_name] return client.exists(key) @blockable - def keys(self, db_id, pattern='*'): + def keys(self, db_name, pattern='*'): """ - Retrieve all the keys of DB %db_id + Retrieve all the keys of DB %db_name """ - client = self.redis_clients[db_id] + client = self.redis_clients[db_name] keys = client.keys(pattern=pattern) if not keys: - message = "DB '{}' is empty!".format(db_id) + message = "DB '{}' is empty!".format(db_name) logger.warning(message) raise UnavailableDataError(message, b'hset') else: return keys @blockable - def get(self, db_id, _hash, key): + def get(self, db_name, _hash, key): """ Retrieve the value of Key %key from Hashtable %hash - in Database %db_id + in Database %db_name Parameter %blocking indicates whether to wait when the query fails """ - client = self.redis_clients[db_id] + client = self.redis_clients[db_name] val = client.hget(_hash, key) if not val: - message = "Key '{}' field '{}' unavailable in database '{}'".format(_hash, key, db_id) + message = "Key '{}' field '{}' unavailable in database '{}'".format(_hash, key, db_name) logger.warning(message) raise UnavailableDataError(message, _hash) else: @@ -288,17 +293,17 @@ def get(self, db_id, _hash, key): return None if val == b'None' else val @blockable - def get_all(self, db_id, _hash): + def get_all(self, db_name, _hash): """ - Get Hashtable %hash from DB %db_id + Get Hashtable %hash from DB %db_name Parameter %blocking indicates whether to wait if the hashtable has not been created yet """ - client = self.redis_clients[db_id] + client = self.redis_clients[db_name] table = client.hgetall(_hash) if not table: - message = "Key '{}' unavailable in database '{}'".format(_hash, db_id) + message = "Key '{}' unavailable in database '{}'".format(_hash, db_name) logger.warning(message) raise UnavailableDataError(message, _hash) else: @@ -306,35 +311,35 @@ def get_all(self, db_id, _hash): return {k: None if v == b'None' else v for k, v in table.items()} @blockable - def set(self, db_id, _hash, key, val): + def set(self, db_name, _hash, key, val): """ - Add %(key, val) to Hashtable %hash in DB %db_id + Add %(key, val) to Hashtable %hash in DB %db_name Parameter %blocking indicates whether to retry in case of failure """ - client = self.redis_clients[db_id] + client = self.redis_clients[db_name] return client.hset(_hash, key, val) @blockable - def delete(self, db_id, key): + def delete(self, db_name, key): """ - Delete %key from DB %db_id + Delete %key from DB %db_name Parameter %blocking indicates whether to retry in case of failure """ - client = self.redis_clients[db_id] + client = self.redis_clients[db_name] return client.delete(key) @blockable - def delete_all_by_pattern(self, db_id, pattern): + def delete_all_by_pattern(self, db_name, pattern): """ - Delete all keys which match %pattern from DB %db_id + Delete all keys which match %pattern from DB %db_name Parameter %blocking indicates whether to retry in case of failure """ - client = self.redis_clients[db_id] + client = self.redis_clients[db_name] keys = client.keys(pattern) for key in keys: client.delete(key) - def _unavailable_data_handler(self, db_id, data): + def _unavailable_data_handler(self, db_name, data): """ When the queried config is not available in Redis--wait until it is available. Two timeouts are at work here: @@ -342,23 +347,23 @@ def _unavailable_data_handler(self, db_id, data): 2. Max data wait - swsssdk-specific. how long to wait for the data to populate (in absolute time) """ start = time.time() - logger.debug("Listening on pubsub channel '{}'".format(db_id)) + logger.debug("Listening on pubsub channel '{}'".format(db_name)) while time.time() - start < self.PUB_SUB_MAXIMUM_DATA_WAIT: - msg = self.keyspace_notification_channels[db_id].get_message(timeout=self.PUB_SUB_NOTIFICATION_TIMEOUT) + msg = self.keyspace_notification_channels[db_name].get_message(timeout=self.PUB_SUB_NOTIFICATION_TIMEOUT) if msg is not None and msg.get('data') == data: - logger.info("'{}' acquired via pub-sub. Unblocking...".format(data, db_id)) + logger.info("'{}' acquired via pub-sub. Unblocking...".format(data, db_name)) # Wait for a "settling" period before releasing the wait. time.sleep(self.DATA_RETRIEVAL_WAIT_TIME) return True - logger.warning("No notification for '{}' from '{}' received before timeout.".format(data, db_id)) + logger.warning("No notification for '{}' from '{}' received before timeout.".format(data, db_name)) return False - def _connection_error_handler(self, db_id): + def _connection_error_handler(self, db_name): """ In the event Redis is unavailable, close existing connections, and try again. """ logger.warning('Could not connect to Redis--waiting before trying again.') - self.close(db_id) + self.close(db_name) time.sleep(self.CONNECT_RETRY_WAIT_TIME) - self.connect(db_id, True) + self.connect(db_name, True)