Skip to content

Commit

Permalink
feat: Add support for data store status monitoring (#252)
Browse files Browse the repository at this point in the history
The client instance will now provide access to a
`data_store_status_provider`. This provider allows developers to
retrieve the data store status of the SDK on demand, or asynchronously
by registering listeners.
  • Loading branch information
keelerm84 committed Nov 21, 2023
1 parent 4df1762 commit 57ca6ac
Show file tree
Hide file tree
Showing 19 changed files with 621 additions and 20 deletions.
133 changes: 121 additions & 12 deletions ldclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
This submodule contains the client class that provides most of the SDK functionality.
"""

from typing import Optional, Any, Dict, Mapping, Union, Tuple
from typing import Optional, Any, Dict, Mapping, Union, Tuple, Callable

from .impl import AnyNum

Expand All @@ -21,52 +21,142 @@
from ldclient.impl.datasource.polling import PollingUpdateProcessor
from ldclient.impl.datasource.streaming import StreamingUpdateProcessor
from ldclient.impl.datasource.status import DataSourceUpdateSinkImpl, DataSourceStatusProviderImpl
from ldclient.impl.datastore.status import DataStoreUpdateSinkImpl, DataStoreStatusProviderImpl
from ldclient.impl.evaluator import Evaluator, error_reason
from ldclient.impl.events.diagnostics import create_diagnostic_id, _DiagnosticAccumulator
from ldclient.impl.events.event_processor import DefaultEventProcessor
from ldclient.impl.events.types import EventFactory
from ldclient.impl.model.feature_flag import FeatureFlag
from ldclient.impl.listeners import Listeners
from ldclient.impl.rwlock import ReadWriteLock
from ldclient.impl.stubs import NullEventProcessor, NullUpdateProcessor
from ldclient.impl.util import check_uwsgi, log
from ldclient.interfaces import BigSegmentStoreStatusProvider, DataSourceStatusProvider, FeatureRequester, FeatureStore, FlagTracker
from ldclient.impl.repeating_task import RepeatingTask
from ldclient.interfaces import BigSegmentStoreStatusProvider, DataSourceStatusProvider, FeatureStore, FlagTracker, DataStoreUpdateSink, DataStoreStatus, DataStoreStatusProvider
from ldclient.versioned_data_kind import FEATURES, SEGMENTS, VersionedDataKind
from ldclient.feature_store import FeatureStore
from ldclient.migrations import Stage, OpTracker
from ldclient.impl.flag_tracker import FlagTrackerImpl

from threading import Lock




class _FeatureStoreClientWrapper(FeatureStore):
"""Provides additional behavior that the client requires before or after feature store operations.
Currently this just means sorting the data set for init(). In the future we may also use this
to provide an update listener capability.
Currently this just means sorting the data set for init() and dealing with data store status listeners.
"""

def __init__(self, store: FeatureStore):
def __init__(self, store: FeatureStore, store_update_sink: DataStoreUpdateSink):
self.store = store
self.__store_update_sink = store_update_sink
self.__monitoring_enabled = self.is_monitoring_enabled()

# Covers the following variables
self.__lock = ReadWriteLock()
self.__last_available = True
self.__poller: Optional[RepeatingTask] = None

def init(self, all_data: Mapping[VersionedDataKind, Mapping[str, Dict[Any, Any]]]):
return self.store.init(_FeatureStoreDataSetSorter.sort_all_collections(all_data))
return self.__wrapper(lambda: self.store.init(_FeatureStoreDataSetSorter.sort_all_collections(all_data)))

def get(self, kind, key, callback):
return self.store.get(kind, key, callback)
return self.__wrapper(lambda: self.store.get(kind, key, callback))

def all(self, kind, callback):
return self.store.all(kind, callback)
return self.__wrapper(lambda: self.store.all(kind, callback))

def delete(self, kind, key, version):
return self.store.delete(kind, key, version)
return self.__wrapper(lambda: self.store.delete(kind, key, version))

def upsert(self, kind, item):
return self.store.upsert(kind, item)
return self.__wrapper(lambda: self.store.upsert(kind, item))

@property
def initialized(self) -> bool:
return self.store.initialized

def __wrapper(self, fn: Callable):
try:
return fn()
except BaseException:
if self.__monitoring_enabled:
self.__update_availability(False)
raise

def __update_availability(self, available: bool):
try:
self.__lock.lock()
if available == self.__last_available:
return
self.__last_available = available
finally:
self.__lock.unlock()

status = DataStoreStatus(available, False)

if available:
log.warn("Persistent store is available again")

self.__store_update_sink.update_status(status)

if available:
try:
self.__lock.lock()
if self.__poller is not None:
self.__poller.stop()
self.__poller = None
finally:
self.__lock.unlock()

return

log.warn("Detected persistent store unavailability; updates will be cached until it recovers")
task = RepeatingTask(0.5, 0, self.__check_availability)

self.__lock.lock()
self.__poller = task
self.__poller.start()
self.__lock.unlock()

def __check_availability(self):
try:
if self.store.available:
self.__update_availability(True)
except BaseException as e:
log.error("Unexpected error from data store status function: %s", e)

def is_monitoring_enabled(self) -> bool:
"""
This methods determines whether the wrapped store can support enabling monitoring.
The wrapped store must provide a monitoring_enabled method, which must
be true. But this alone is not sufficient.
Because this class wraps all interactions with a provided store, it can
technically "monitor" any store. However, monitoring also requires that
we notify listeners when the store is available again.
We determine this by checking the store's `available?` method, so this
is also a requirement for monitoring support.
These extra checks won't be necessary once `available` becomes a part
of the core interface requirements and this class no longer wraps every
feature store.
"""

if not hasattr(self.store, 'is_monitoring_enabled'):
return False

if not hasattr(self.store, 'is_available'):
return False

monitoring_enabled = getattr(self.store, 'is_monitoring_enabled')
if not callable(monitoring_enabled):
return False

return monitoring_enabled()


def _get_store_item(store, kind: VersionedDataKind, key: str) -> Any:
# This decorator around store.get provides backward compatibility with any custom data
Expand Down Expand Up @@ -102,7 +192,11 @@ def __init__(self, config: Config, start_wait: float=5):
self._event_factory_default = EventFactory(False)
self._event_factory_with_reasons = EventFactory(True)

store = _FeatureStoreClientWrapper(self._config.feature_store)
data_store_listeners = Listeners()
store_sink = DataStoreUpdateSinkImpl(data_store_listeners)
store = _FeatureStoreClientWrapper(self._config.feature_store, store_sink)

self.__data_store_status_provider = DataStoreStatusProviderImpl(store, store_sink)

data_source_listeners = Listeners()
flag_change_listeners = Listeners()
Expand Down Expand Up @@ -515,6 +609,21 @@ def data_source_status_provider(self) -> DataSourceStatusProvider:
"""
return self.__data_source_status_provider

@property
def data_store_status_provider(self) -> DataStoreStatusProvider:
"""
Returns an interface for tracking the status of a persistent data store.
The provider has methods for checking whether the data store is (as far
as the SDK knows) currently operational, tracking changes in this
status, and getting cache statistics. These are only relevant for a
persistent data store; if you are using an in-memory data store, then
this method will return a stub object that provides no information.
:return: The data store status provider
"""
return self.__data_store_status_provider

@property
def flag_tracker(self) -> FlagTracker:
"""
Expand Down
6 changes: 6 additions & 0 deletions ldclient/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ def __init__(self):
self._initialized = False
self._items = defaultdict(dict)

def is_monitoring_enabled(self) -> bool:
return False

def is_available(self) -> bool:
return True

def get(self, kind: VersionedDataKind, key: str, callback: Callable[[Any], Any]=lambda x: x) -> Any:
"""
"""
Expand Down
9 changes: 9 additions & 0 deletions ldclient/feature_store_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,21 @@ def __init__(self, core: FeatureStoreCore, cache_config: CacheConfig):
:param cache_config: the caching parameters
"""
self._core = core
self.__has_available_method = callable(getattr(core, 'is_available', None))

if cache_config.enabled:
self._cache = ExpiringDict(max_len=cache_config.capacity, max_age_seconds=cache_config.expiration)
else:
self._cache = None
self._inited = False

def is_monitoring_enabled(self) -> bool:
return self.__has_available_method

def is_available(self) -> bool:
# We know is_available exists since we are checking __has_available_method
return self._core.is_available() if self.__has_available_method else False # type: ignore

def init(self, all_encoded_data: Mapping[VersionedDataKind, Mapping[str, Dict[Any, Any]]]):
"""
"""
Expand Down
6 changes: 3 additions & 3 deletions ldclient/impl/datasource/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,13 @@ def __compute_changed_items_for_full_data_set(self, old_data: Mapping[VersionedD


class DataSourceStatusProviderImpl(DataSourceStatusProvider):
def __init__(self, listeners: Listeners, updates_sink: DataSourceUpdateSinkImpl):
def __init__(self, listeners: Listeners, update_sink: DataSourceUpdateSinkImpl):
self.__listeners = listeners
self.__updates_sink = updates_sink
self.__update_sink = update_sink

@property
def status(self) -> DataSourceStatus:
return self.__updates_sink.status
return self.__update_sink.status

def add_listener(self, listener: Callable[[DataSourceStatus], None]):
self.__listeners.add(listener)
Expand Down
Empty file.
56 changes: 56 additions & 0 deletions ldclient/impl/datastore/status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from __future__ import annotations
from typing import Callable, TYPE_CHECKING
from copy import copy

from ldclient.interfaces import DataStoreStatusProvider, DataStoreStatus, DataStoreUpdateSink
from ldclient.impl.listeners import Listeners
from ldclient.impl.rwlock import ReadWriteLock

if TYPE_CHECKING:
from ldclient.client import _FeatureStoreClientWrapper


class DataStoreUpdateSinkImpl(DataStoreUpdateSink):
def __init__(self, listeners: Listeners):
self.__listeners = listeners

self.__lock = ReadWriteLock()
self.__status = DataStoreStatus(True, False)

@property
def listeners(self) -> Listeners:
return self.__listeners

def status(self) -> DataStoreStatus:
self.__lock.rlock()
status = copy(self.__status)
self.__lock.runlock()

return status

def update_status(self, status: DataStoreStatus):
self.__lock.lock()
old_value, self.__status = self.__status, status
self.__lock.unlock()

if old_value != status:
self.__listeners.notify(status)


class DataStoreStatusProviderImpl(DataStoreStatusProvider):
def __init__(self, store: _FeatureStoreClientWrapper, update_sink: DataStoreUpdateSinkImpl):
self.__store = store
self.__update_sink = update_sink

@property
def status(self) -> DataStoreStatus:
return self.__update_sink.status()

def is_monitoring_enabled(self) -> bool:
return self.__store.is_monitoring_enabled()

def add_listener(self, listener: Callable[[DataStoreStatus], None]):
self.__update_sink.listeners.add(listener)

def remove_listener(self, listener: Callable[[DataStoreStatus], None]):
self.__update_sink.listeners.remove(listener)
7 changes: 7 additions & 0 deletions ldclient/impl/integrations/consul/consul_feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ def __init__(self, host, port, prefix, consul_opts):
self._prefix = ("launchdarkly" if prefix is None else prefix) + "/"
self._client = consul.Consul(**opts)

def is_available(self) -> bool:
try:
self._client.kv.get(self._inited_key())
return True
except BaseException:
return False

def init_internal(self, all_data):
# Start by reading the existing keys; we will later delete any of these that weren't in all_data.
index, keys = self._client.kv.get(self._prefix, recurse=True, keys=True)
Expand Down
8 changes: 8 additions & 0 deletions ldclient/impl/integrations/dynamodb/dynamodb_feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ def __init__(self, table_name, prefix, dynamodb_opts):
self._prefix = (prefix + ":") if prefix else ""
self._client = boto3.client('dynamodb', **dynamodb_opts)

def is_available(self) -> bool:
try:
inited_key = self._inited_key()
self._get_item_by_keys(inited_key, inited_key)
return True
except BaseException:
return False

def init_internal(self, all_data):
# Start by reading the existing keys; we will later delete any of these that weren't in all_data.
unused_old_keys = self._read_existing_keys(all_data.keys())
Expand Down
7 changes: 7 additions & 0 deletions ldclient/impl/integrations/redis/redis_feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ def __init__(self, url, prefix, redis_opts: Dict[str, Any]):
self.test_update_hook = None # exposed for testing
log.info("Started RedisFeatureStore connected to URL: " + redact_password(url) + " using prefix: " + self._prefix)

def is_available(self) -> bool:
try:
self.initialized_internal()
return True
except BaseException:
return False

def _items_key(self, kind):
return "{0}:{1}".format(self._prefix, kind.namespace)

Expand Down
Loading

0 comments on commit 57ca6ac

Please sign in to comment.