Skip to content

Commit

Permalink
feat: Add data source status provider support (#228)
Browse files Browse the repository at this point in the history
The client instance will now provide access to a
`data_source_status_provider`. This provider allows developers to
retrieve the status of the SDK on demand, or through registered
listeners.
  • Loading branch information
keelerm84 committed Nov 21, 2023
1 parent 99aafd5 commit f733d07
Show file tree
Hide file tree
Showing 14 changed files with 859 additions and 55 deletions.
23 changes: 22 additions & 1 deletion ldclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@
from ldclient.impl.datasource.feature_requester import FeatureRequesterImpl
from ldclient.impl.datasource.polling import PollingUpdateProcessor
from ldclient.impl.datasource.streaming import StreamingUpdateProcessor
from ldclient.impl.datasource.status import DataSourceUpdateSinkImpl, DataSourceStatusProviderImpl
from ldclient.impl.evaluator import Evaluator, error_reason
from ldclient.impl.events.diagnostics import create_diagnostic_id, _DiagnosticAccumulator
from ldclient.impl.events.event_processor import DefaultEventProcessor
from ldclient.impl.events.types import EventFactory
from ldclient.impl.model.feature_flag import FeatureFlag
from ldclient.impl.listeners import Listeners
from ldclient.impl.stubs import NullEventProcessor, NullUpdateProcessor
from ldclient.impl.util import check_uwsgi, log
from ldclient.interfaces import BigSegmentStoreStatusProvider, FeatureRequester, FeatureStore
from ldclient.interfaces import BigSegmentStoreStatusProvider, DataSourceStatusProvider, FeatureRequester, FeatureStore
from ldclient.versioned_data_kind import FEATURES, SEGMENTS, VersionedDataKind
from ldclient.feature_store import FeatureStore
from ldclient.migrations import Stage, OpTracker
Expand Down Expand Up @@ -100,6 +102,10 @@ def __init__(self, config: Config, start_wait: float=5):
self._event_factory_with_reasons = EventFactory(True)

store = _FeatureStoreClientWrapper(self._config.feature_store)

listeners = Listeners()
self._config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners)
self.__data_source_status_provider = DataSourceStatusProviderImpl(listeners, self._config._data_source_update_sink)
self._store = store # type: FeatureStore

big_segment_store_manager = BigSegmentStoreManager(self._config.big_segments)
Expand Down Expand Up @@ -489,5 +495,20 @@ def big_segment_store_status_provider(self) -> BigSegmentStoreStatusProvider:
"""
return self.__big_segment_store_manager.status_provider

@property
def data_source_status_provider(self) -> DataSourceStatusProvider:
"""
Returns an interface for tracking the status of the data source.
The data source is the mechanism that the SDK uses to get feature flag configurations, such
as a streaming connection (the default) or poll requests. The
:class:`ldclient.interfaces.DataSourceStatusProvider` has methods for checking whether the
data source is (as far as the SDK knows) currently operational and tracking changes in this
status.
:return: The data source status provider
"""
return self.__data_source_status_provider


__all__ = ['LDClient', 'Config']
17 changes: 16 additions & 1 deletion ldclient/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from ldclient.feature_store import InMemoryFeatureStore
from ldclient.impl.util import log, validate_application_info
from ldclient.interfaces import BigSegmentStore, EventProcessor, FeatureStore, UpdateProcessor
from ldclient.interfaces import BigSegmentStore, EventProcessor, FeatureStore, UpdateProcessor, DataSourceUpdateSink

GET_LATEST_FEATURES_PATH = '/sdk/latest-flags'
STREAM_FLAGS_PATH = '/flags'
Expand Down Expand Up @@ -269,6 +269,7 @@ def __init__(self,
self.__http = http
self.__big_segments = BigSegmentsConfig() if not big_segments else big_segments
self.__application = validate_application_info(application or {}, log)
self._data_source_update_sink: Optional[DataSourceUpdateSink] = None

def copy_with_new_sdk_key(self, new_sdk_key: str) -> 'Config':
"""Returns a new ``Config`` instance that is the same as this one, except for having a different SDK key.
Expand Down Expand Up @@ -440,6 +441,20 @@ def application(self) -> dict:
"""
return self.__application

@property
def data_source_update_sink(self) -> Optional[DataSourceUpdateSink]:
"""
Returns the component that allows a data source to push data into the SDK.
This property should only be set by the SDK. Long term access of this
property is not supported; it is temporarily being exposed to maintain
backwards compatibility while the SDK structure is updated.
Custom data source implementations should integrate with this sink if
they want to provide support for data source status listeners.
"""
return self._data_source_update_sink

def _validate(self):
if self.offline is False and self.sdk_key is None or self.sdk_key == '':
log.warning("Missing or blank sdk_key.")
Expand Down
68 changes: 61 additions & 7 deletions ldclient/impl/datasource/polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@
from ldclient.config import Config
from ldclient.impl.repeating_task import RepeatingTask
from ldclient.impl.util import UnsuccessfulResponseException, http_error_message, is_http_error_recoverable, log
from ldclient.interfaces import FeatureRequester, FeatureStore, UpdateProcessor
from ldclient.interfaces import FeatureRequester, FeatureStore, UpdateProcessor, DataSourceUpdateSink, DataSourceErrorInfo, DataSourceErrorKind, DataSourceState

import time
from typing import Optional


class PollingUpdateProcessor(UpdateProcessor):
def __init__(self, config: Config, requester: FeatureRequester, store: FeatureStore, ready: Event):
self._config = config
self._data_source_update_sink: Optional[DataSourceUpdateSink] = config.data_source_update_sink
self._requester = requester
self._store = store
self._ready = ready
Expand All @@ -27,24 +31,74 @@ def initialized(self):
return self._ready.is_set() is True and self._store.initialized is True

def stop(self):
self.__stop_with_error_info(None)

def __stop_with_error_info(self, error: Optional[DataSourceErrorInfo]):
log.info("Stopping PollingUpdateProcessor")
self._task.stop()

if self._data_source_update_sink is None:
return

self._data_source_update_sink.update_status(
DataSourceState.OFF,
error
)

def _sink_or_store(self):
"""
The original implementation of this class relied on the feature store
directly, which we are trying to move away from. Customers who might have
instantiated this directly for some reason wouldn't know they have to set
the config's sink manually, so we have to fall back to the store if the
sink isn't present.
The next major release should be able to simplify this structure and
remove the need for fall back to the data store because the update sink
should always be present.
"""
if self._data_source_update_sink is None:
return self._store

return self._data_source_update_sink

def _poll(self):
try:
all_data = self._requester.get_all_data()
self._store.init(all_data)
self._sink_or_store().init(all_data)
if not self._ready.is_set() and self._store.initialized:
log.info("PollingUpdateProcessor initialized ok")
self._ready.set()

if self._data_source_update_sink is not None:
self._data_source_update_sink.update_status(DataSourceState.VALID, None)
except UnsuccessfulResponseException as e:
error_info = DataSourceErrorInfo(
DataSourceErrorKind.ERROR_RESPONSE,
e.status,
time.time(),
str(e)
)

http_error_message_result = http_error_message(e.status, "polling request")
if is_http_error_recoverable(e.status):
log.warning(http_error_message_result)
else:
if not is_http_error_recoverable(e.status):
log.error(http_error_message_result)
self._ready.set() # if client is initializing, make it stop waiting; has no effect if already inited
self.stop()
self._ready.set() # if client is initializing, make it stop waiting; has no effect if already inited
self.__stop_with_error_info(error_info)
else:
log.warning(http_error_message_result)

if self._data_source_update_sink is not None:
self._data_source_update_sink.update_status(
DataSourceState.INTERRUPTED,
error_info
)
except Exception as e:
log.exception(
'Error: Exception encountered when updating flags. %s' % e)

if self._data_source_update_sink is not None:
self._data_source_update_sink.update_status(
DataSourceState.INTERRUPTED,
DataSourceErrorInfo(DataSourceErrorKind.UNKNOWN, 0, time.time, str(e))
)
92 changes: 92 additions & 0 deletions ldclient/impl/datasource/status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from ldclient.impl.listeners import Listeners
from ldclient.interfaces import DataSourceStatusProvider, DataSourceUpdateSink, DataSourceStatus, FeatureStore, DataSourceState, DataSourceErrorInfo, DataSourceErrorKind
from ldclient.impl.rwlock import ReadWriteLock
from ldclient.versioned_data_kind import VersionedDataKind

import time
from typing import Callable, Mapping, Optional


class DataSourceUpdateSinkImpl(DataSourceUpdateSink):
def __init__(self, store: FeatureStore, listeners: Listeners):
self.__store = store
self.__listeners = listeners

self.__lock = ReadWriteLock()
self.__status = DataSourceStatus(
DataSourceState.INITIALIZING,
time.time(),
None
)

@property
def status(self) -> DataSourceStatus:
try:
self.__lock.rlock()
return self.__status
finally:
self.__lock.runlock()

def init(self, all_data: Mapping[VersionedDataKind, Mapping[str, dict]]):
self.__monitor_store_update(lambda: self.__store.init(all_data))

def upsert(self, kind: VersionedDataKind, item: dict):
self.__monitor_store_update(lambda: self.__store.upsert(kind, item))

def delete(self, kind: VersionedDataKind, key: str, version: int):
self.__monitor_store_update(lambda: self.__store.delete(kind, key, version))

def update_status(self, new_state: DataSourceState, new_error: Optional[DataSourceErrorInfo]):
status_to_broadcast = None

try:
self.__lock.lock()
old_status = self.__status

if new_state == DataSourceState.INTERRUPTED and old_status.state == DataSourceState.INITIALIZING:
new_state = DataSourceState.INITIALIZING

if new_state == old_status.state and new_error is None:
return

self.__status = DataSourceStatus(
new_state,
self.__status.since if new_state == self.__status.state else time.time(),
self.__status.error if new_error is None else new_error
)

status_to_broadcast = self.__status
finally:
self.__lock.unlock()

if status_to_broadcast is not None:
self.__listeners.notify(status_to_broadcast)

def __monitor_store_update(self, fn: Callable[[], None]):
try:
fn()
except Exception as e:
error_info = DataSourceErrorInfo(
DataSourceErrorKind.STORE_ERROR,
0,
time.time(),
str(e)
)
self.update_status(DataSourceState.INTERRUPTED, error_info)
raise


class DataSourceStatusProviderImpl(DataSourceStatusProvider):
def __init__(self, listeners: Listeners, updates_sink: DataSourceUpdateSinkImpl):
self.__listeners = listeners
self.__updates_sink = updates_sink

@property
def status(self) -> DataSourceStatus:
return self.__updates_sink.status

def add_listener(self, listener: Callable[[DataSourceStatus], None]):
self.__listeners.add(listener)

def remove_listener(self, listener: Callable[[DataSourceStatus], None]):
self.__listeners.remove(listener)
Loading

0 comments on commit f733d07

Please sign in to comment.