diff --git a/ldclient/client.py b/ldclient/client.py index d7a24941..500e3cea 100644 --- a/ldclient/client.py +++ b/ldclient/client.py @@ -20,14 +20,16 @@ from ldclient.impl.datasource.feature_requester import FeatureRequesterImpl from ldclient.impl.datasource.polling import PollingUpdateProcessor from ldclient.impl.datasource.streaming import StreamingUpdateProcessor +from ldclient.impl.datasource.status import DataSourceUpdateSinkImpl, DataSourceStatusProviderImpl from ldclient.impl.evaluator import Evaluator, error_reason from ldclient.impl.events.diagnostics import create_diagnostic_id, _DiagnosticAccumulator from ldclient.impl.events.event_processor import DefaultEventProcessor from ldclient.impl.events.types import EventFactory from ldclient.impl.model.feature_flag import FeatureFlag +from ldclient.impl.listeners import Listeners from ldclient.impl.stubs import NullEventProcessor, NullUpdateProcessor from ldclient.impl.util import check_uwsgi, log -from ldclient.interfaces import BigSegmentStoreStatusProvider, FeatureRequester, FeatureStore +from ldclient.interfaces import BigSegmentStoreStatusProvider, DataSourceStatusProvider, FeatureRequester, FeatureStore from ldclient.versioned_data_kind import FEATURES, SEGMENTS, VersionedDataKind from ldclient.feature_store import FeatureStore from ldclient.migrations import Stage, OpTracker @@ -100,6 +102,10 @@ def __init__(self, config: Config, start_wait: float=5): self._event_factory_with_reasons = EventFactory(True) store = _FeatureStoreClientWrapper(self._config.feature_store) + + listeners = Listeners() + self._config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners) + self.__data_source_status_provider = DataSourceStatusProviderImpl(listeners, self._config._data_source_update_sink) self._store = store # type: FeatureStore big_segment_store_manager = BigSegmentStoreManager(self._config.big_segments) @@ -489,5 +495,20 @@ def big_segment_store_status_provider(self) -> BigSegmentStoreStatusProvider: """ return self.__big_segment_store_manager.status_provider + @property + def data_source_status_provider(self) -> DataSourceStatusProvider: + """ + Returns an interface for tracking the status of the data source. + + The data source is the mechanism that the SDK uses to get feature flag configurations, such + as a streaming connection (the default) or poll requests. The + :class:`ldclient.interfaces.DataSourceStatusProvider` has methods for checking whether the + data source is (as far as the SDK knows) currently operational and tracking changes in this + status. + + :return: The data source status provider + """ + return self.__data_source_status_provider + __all__ = ['LDClient', 'Config'] diff --git a/ldclient/config.py b/ldclient/config.py index a84a8419..47d747eb 100644 --- a/ldclient/config.py +++ b/ldclient/config.py @@ -8,7 +8,7 @@ from ldclient.feature_store import InMemoryFeatureStore from ldclient.impl.util import log, validate_application_info -from ldclient.interfaces import BigSegmentStore, EventProcessor, FeatureStore, UpdateProcessor +from ldclient.interfaces import BigSegmentStore, EventProcessor, FeatureStore, UpdateProcessor, DataSourceUpdateSink GET_LATEST_FEATURES_PATH = '/sdk/latest-flags' STREAM_FLAGS_PATH = '/flags' @@ -269,6 +269,7 @@ def __init__(self, self.__http = http self.__big_segments = BigSegmentsConfig() if not big_segments else big_segments self.__application = validate_application_info(application or {}, log) + self._data_source_update_sink: Optional[DataSourceUpdateSink] = None def copy_with_new_sdk_key(self, new_sdk_key: str) -> 'Config': """Returns a new ``Config`` instance that is the same as this one, except for having a different SDK key. @@ -440,6 +441,20 @@ def application(self) -> dict: """ return self.__application + @property + def data_source_update_sink(self) -> Optional[DataSourceUpdateSink]: + """ + Returns the component that allows a data source to push data into the SDK. + + This property should only be set by the SDK. Long term access of this + property is not supported; it is temporarily being exposed to maintain + backwards compatibility while the SDK structure is updated. + + Custom data source implementations should integrate with this sink if + they want to provide support for data source status listeners. + """ + return self._data_source_update_sink + def _validate(self): if self.offline is False and self.sdk_key is None or self.sdk_key == '': log.warning("Missing or blank sdk_key.") diff --git a/ldclient/impl/datasource/polling.py b/ldclient/impl/datasource/polling.py index 68f61ebc..b53dcc2c 100644 --- a/ldclient/impl/datasource/polling.py +++ b/ldclient/impl/datasource/polling.py @@ -8,12 +8,16 @@ from ldclient.config import Config from ldclient.impl.repeating_task import RepeatingTask from ldclient.impl.util import UnsuccessfulResponseException, http_error_message, is_http_error_recoverable, log -from ldclient.interfaces import FeatureRequester, FeatureStore, UpdateProcessor +from ldclient.interfaces import FeatureRequester, FeatureStore, UpdateProcessor, DataSourceUpdateSink, DataSourceErrorInfo, DataSourceErrorKind, DataSourceState + +import time +from typing import Optional class PollingUpdateProcessor(UpdateProcessor): def __init__(self, config: Config, requester: FeatureRequester, store: FeatureStore, ready: Event): self._config = config + self._data_source_update_sink: Optional[DataSourceUpdateSink] = config.data_source_update_sink self._requester = requester self._store = store self._ready = ready @@ -27,24 +31,74 @@ def initialized(self): return self._ready.is_set() is True and self._store.initialized is True def stop(self): + self.__stop_with_error_info(None) + + def __stop_with_error_info(self, error: Optional[DataSourceErrorInfo]): log.info("Stopping PollingUpdateProcessor") self._task.stop() + if self._data_source_update_sink is None: + return + + self._data_source_update_sink.update_status( + DataSourceState.OFF, + error + ) + + def _sink_or_store(self): + """ + The original implementation of this class relied on the feature store + directly, which we are trying to move away from. Customers who might have + instantiated this directly for some reason wouldn't know they have to set + the config's sink manually, so we have to fall back to the store if the + sink isn't present. + + The next major release should be able to simplify this structure and + remove the need for fall back to the data store because the update sink + should always be present. + """ + if self._data_source_update_sink is None: + return self._store + + return self._data_source_update_sink + def _poll(self): try: all_data = self._requester.get_all_data() - self._store.init(all_data) + self._sink_or_store().init(all_data) if not self._ready.is_set() and self._store.initialized: log.info("PollingUpdateProcessor initialized ok") self._ready.set() + + if self._data_source_update_sink is not None: + self._data_source_update_sink.update_status(DataSourceState.VALID, None) except UnsuccessfulResponseException as e: + error_info = DataSourceErrorInfo( + DataSourceErrorKind.ERROR_RESPONSE, + e.status, + time.time(), + str(e) + ) + http_error_message_result = http_error_message(e.status, "polling request") - if is_http_error_recoverable(e.status): - log.warning(http_error_message_result) - else: + if not is_http_error_recoverable(e.status): log.error(http_error_message_result) - self._ready.set() # if client is initializing, make it stop waiting; has no effect if already inited - self.stop() + self._ready.set() # if client is initializing, make it stop waiting; has no effect if already inited + self.__stop_with_error_info(error_info) + else: + log.warning(http_error_message_result) + + if self._data_source_update_sink is not None: + self._data_source_update_sink.update_status( + DataSourceState.INTERRUPTED, + error_info + ) except Exception as e: log.exception( 'Error: Exception encountered when updating flags. %s' % e) + + if self._data_source_update_sink is not None: + self._data_source_update_sink.update_status( + DataSourceState.INTERRUPTED, + DataSourceErrorInfo(DataSourceErrorKind.UNKNOWN, 0, time.time, str(e)) + ) diff --git a/ldclient/impl/datasource/status.py b/ldclient/impl/datasource/status.py new file mode 100644 index 00000000..89034202 --- /dev/null +++ b/ldclient/impl/datasource/status.py @@ -0,0 +1,92 @@ +from ldclient.impl.listeners import Listeners +from ldclient.interfaces import DataSourceStatusProvider, DataSourceUpdateSink, DataSourceStatus, FeatureStore, DataSourceState, DataSourceErrorInfo, DataSourceErrorKind +from ldclient.impl.rwlock import ReadWriteLock +from ldclient.versioned_data_kind import VersionedDataKind + +import time +from typing import Callable, Mapping, Optional + + +class DataSourceUpdateSinkImpl(DataSourceUpdateSink): + def __init__(self, store: FeatureStore, listeners: Listeners): + self.__store = store + self.__listeners = listeners + + self.__lock = ReadWriteLock() + self.__status = DataSourceStatus( + DataSourceState.INITIALIZING, + time.time(), + None + ) + + @property + def status(self) -> DataSourceStatus: + try: + self.__lock.rlock() + return self.__status + finally: + self.__lock.runlock() + + def init(self, all_data: Mapping[VersionedDataKind, Mapping[str, dict]]): + self.__monitor_store_update(lambda: self.__store.init(all_data)) + + def upsert(self, kind: VersionedDataKind, item: dict): + self.__monitor_store_update(lambda: self.__store.upsert(kind, item)) + + def delete(self, kind: VersionedDataKind, key: str, version: int): + self.__monitor_store_update(lambda: self.__store.delete(kind, key, version)) + + def update_status(self, new_state: DataSourceState, new_error: Optional[DataSourceErrorInfo]): + status_to_broadcast = None + + try: + self.__lock.lock() + old_status = self.__status + + if new_state == DataSourceState.INTERRUPTED and old_status.state == DataSourceState.INITIALIZING: + new_state = DataSourceState.INITIALIZING + + if new_state == old_status.state and new_error is None: + return + + self.__status = DataSourceStatus( + new_state, + self.__status.since if new_state == self.__status.state else time.time(), + self.__status.error if new_error is None else new_error + ) + + status_to_broadcast = self.__status + finally: + self.__lock.unlock() + + if status_to_broadcast is not None: + self.__listeners.notify(status_to_broadcast) + + def __monitor_store_update(self, fn: Callable[[], None]): + try: + fn() + except Exception as e: + error_info = DataSourceErrorInfo( + DataSourceErrorKind.STORE_ERROR, + 0, + time.time(), + str(e) + ) + self.update_status(DataSourceState.INTERRUPTED, error_info) + raise + + +class DataSourceStatusProviderImpl(DataSourceStatusProvider): + def __init__(self, listeners: Listeners, updates_sink: DataSourceUpdateSinkImpl): + self.__listeners = listeners + self.__updates_sink = updates_sink + + @property + def status(self) -> DataSourceStatus: + return self.__updates_sink.status + + def add_listener(self, listener: Callable[[DataSourceStatus], None]): + self.__listeners.add(listener) + + def remove_listener(self, listener: Callable[[DataSourceStatus], None]): + self.__listeners.remove(listener) diff --git a/ldclient/impl/datasource/streaming.py b/ldclient/impl/datasource/streaming.py index b3044c1c..ec8debe9 100644 --- a/ldclient/impl/datasource/streaming.py +++ b/ldclient/impl/datasource/streaming.py @@ -1,8 +1,11 @@ from collections import namedtuple import json from threading import Thread +from typing import Optional + import time +from ldclient.interfaces import DataSourceErrorInfo, DataSourceErrorKind, DataSourceState from ldclient.impl.http import HTTPFactory, _http_factory from ldclient.impl.util import http_error_message, is_http_error_recoverable, log from ldclient.interfaces import UpdateProcessor @@ -32,6 +35,7 @@ def __init__(self, config, store, ready, diagnostic_accumulator): self.daemon = True self._uri = config.stream_base_uri + STREAM_ALL_PATH self._config = config + self._data_source_update_sink = config.data_source_update_sink self._store = store self._running = False self._ready = ready @@ -45,18 +49,47 @@ def run(self): self._connection_attempt_start_time = time.time() for action in self._sse.all: if isinstance(action, Event): + message_ok = False try: - message_ok = self._process_message(action) + message_ok = self._process_message(self._sink_or_store(), action) + except json.decoder.JSONDecodeError as e: + log.info("Error while handling stream event; will restart stream: %s" % e) + self._sse.interrupt() + + self._handle_error(e) except Exception as e: log.info("Error while handling stream event; will restart stream: %s" % e) self._sse.interrupt() + + if self._data_source_update_sink is not None: + error_info = DataSourceErrorInfo( + DataSourceErrorKind.UNKNOWN, + 0, + time.time(), + str(e) + ) + + self._data_source_update_sink.update_status( + DataSourceState.INTERRUPTED, + error_info + ) + if message_ok: self._record_stream_init(False) self._connection_attempt_start_time = None + + if self._data_source_update_sink is not None: + self._data_source_update_sink.update_status(DataSourceState.VALID, None) + if not self._ready.is_set(): log.info("StreamingUpdateProcessor initialized ok.") self._ready.set() elif isinstance(action, Fault): + # If the SSE client detects the stream has closed, then it will emit a fault with no-error. We can + # ignore this since we want the connection to continue. + if action.error is None: + continue + if not self._handle_error(action.error): break self._sse.close() @@ -71,7 +104,7 @@ def _create_sse_client(self) -> SSEClient: # We don't want the stream to use the same read timeout as the rest of the SDK. http_factory = _http_factory(self._config) stream_http_factory = HTTPFactory(http_factory.base_headers, http_factory.http_config, - override_read_timeout=stream_read_timeout) + override_read_timeout=stream_read_timeout) return SSEClient( connect=ConnectStrategy.http( url=self._uri, @@ -91,14 +124,31 @@ def _create_sse_client(self) -> SSEClient: ) def stop(self): + self.__stop_with_error_info(None) + + def __stop_with_error_info(self, error: Optional[DataSourceErrorInfo]): log.info("Stopping StreamingUpdateProcessor") self._running = False + if self._data_source_update_sink is None: + return + + self._data_source_update_sink.update_status( + DataSourceState.OFF, + error + ) + + def _sink_or_store(self): + if self._data_source_update_sink is None: + return self._store + + return self._data_source_update_sink + def initialized(self): return self._running and self._ready.is_set() is True and self._store.initialized is True # Returns True if we initialized the feature store - def _process_message(self, msg: Event) -> bool: + def _process_message(self, store, msg: Event) -> bool: if msg.event == 'put': all_data = json.loads(msg.data) init_data = { @@ -106,8 +156,8 @@ def _process_message(self, msg: Event) -> bool: SEGMENTS: all_data['data']['segments'] } log.debug("Received put event with %d flags and %d segments", - len(init_data[FEATURES]), len(init_data[SEGMENTS])) - self._store.init(init_data) + len(init_data[FEATURES]), len(init_data[SEGMENTS])) + store.init(init_data) return True elif msg.event == 'patch': payload = json.loads(msg.data) @@ -116,7 +166,7 @@ def _process_message(self, msg: Event) -> bool: log.debug("Received patch event for %s, New version: [%d]", path, obj.get("version")) target = StreamingUpdateProcessor._parse_path(path) if target is not None: - self._store.upsert(target.kind, obj) + store.upsert(target.kind, obj) else: log.warning("Patch for unknown path: %s", path) elif msg.event == 'delete': @@ -127,7 +177,7 @@ def _process_message(self, msg: Event) -> bool: log.debug("Received delete event for %s, New version: [%d]", path, version) target = StreamingUpdateProcessor._parse_path(path) if target is not None: - self._store.delete(target.kind, target.key, version) + store.delete(target.kind, target.key, version) else: log.warning("Delete for unknown path: %s", path) else: @@ -138,22 +188,60 @@ def _process_message(self, msg: Event) -> bool: def _handle_error(self, error: Exception) -> bool: if not self._running: return False # don't retry if we've been deliberately stopped - if isinstance(error, HTTPStatusError): + + if isinstance(error, json.decoder.JSONDecodeError): + error_info = DataSourceErrorInfo( + DataSourceErrorKind.INVALID_DATA, + 0, + time.time(), + str(error) + ) + + log.error("Unexpected error on stream connection: %s, will retry" % error) self._record_stream_init(True) self._connection_attempt_start_time = None + if self._data_source_update_sink is not None: + self._data_source_update_sink.update_status( + DataSourceState.INTERRUPTED, + error_info + ) + elif isinstance(error, HTTPStatusError): + self._record_stream_init(True) + self._connection_attempt_start_time = None + + error_info = DataSourceErrorInfo( + DataSourceErrorKind.ERROR_RESPONSE, + error.status, + time.time(), + str(error) + ) + http_error_message_result = http_error_message(error.status, "stream connection") if not is_http_error_recoverable(error.status): log.error(http_error_message_result) self._ready.set() # if client is initializing, make it stop waiting; has no effect if already inited + self.__stop_with_error_info(error_info) self.stop() return False else: log.warning(http_error_message_result) + + if self._data_source_update_sink is not None: + self._data_source_update_sink.update_status( + DataSourceState.INTERRUPTED, + error_info + ) else: log.warning("Unexpected error on stream connection: %s, will retry" % error) self._record_stream_init(True) self._connection_attempt_start_time = None + + if self._data_source_update_sink is not None: + self._data_source_update_sink.update_status( + DataSourceState.INTERRUPTED, + DataSourceErrorInfo(DataSourceErrorKind.UNKNOWN, 0, time.time(), str(error)) + ) # no stacktrace here because, for a typical connection error, it'll just be a lengthy tour of urllib3 internals self._connection_attempt_start_time = time.time() + self._sse.next_retry_delay return True diff --git a/ldclient/impl/integrations/files/file_data_source.py b/ldclient/impl/integrations/files/file_data_source.py index 1b292fee..d02d5b28 100644 --- a/ldclient/impl/integrations/files/file_data_source.py +++ b/ldclient/impl/integrations/files/file_data_source.py @@ -1,6 +1,8 @@ import json import os import traceback +import time +from typing import Optional have_yaml = False try: @@ -20,16 +22,19 @@ from ldclient.impl.repeating_task import RepeatingTask from ldclient.impl.util import log -from ldclient.interfaces import UpdateProcessor +from ldclient.interfaces import UpdateProcessor, DataSourceUpdateSink, DataSourceState, DataSourceErrorInfo, DataSourceErrorKind from ldclient.versioned_data_kind import FEATURES, SEGMENTS + def _sanitize_json_item(item): if not ('version' in item): item['version'] = 1 + class _FileDataSource(UpdateProcessor): - def __init__(self, store, ready, paths, auto_update, poll_interval, force_polling): + def __init__(self, store, data_source_update_sink: Optional[DataSourceUpdateSink], ready, paths, auto_update, poll_interval, force_polling): self._store = store + self._data_source_update_sink = data_source_update_sink self._ready = ready self._inited = False self._paths = paths @@ -40,6 +45,23 @@ def __init__(self, store, ready, paths, auto_update, poll_interval, force_pollin self._poll_interval = poll_interval self._force_polling = force_polling + def _sink_or_store(self): + """ + The original implementation of this class relied on the feature store + directly, which we are trying to move away from. Customers who might have + instantiated this directly for some reason wouldn't know they have to set + the config's sink manually, so we have to fall back to the store if the + sink isn't present. + + The next major release should be able to simplify this structure and + remove the need for fall back to the data store because the update sink + should always be present. + """ + if self._data_source_update_sink is None: + return self._store + + return self._data_source_update_sink + def start(self): self._load_all() @@ -65,13 +87,25 @@ def _load_all(self): except Exception as e: log.error('Unable to load flag data from "%s": %s' % (path, repr(e))) traceback.print_exc() + if self._data_source_update_sink is not None: + self._data_source_update_sink.update_status( + DataSourceState.INTERRUPTED, + DataSourceErrorInfo(DataSourceErrorKind.INVALID_DATA, 0, time.time, str(e)) + ) return try: - self._store.init(all_data) + self._sink_or_store().init(all_data) self._inited = True + if self._data_source_update_sink is not None: + self._data_source_update_sink.update_status(DataSourceState.VALID, None) except Exception as e: log.error('Unable to store data: %s' % repr(e)) traceback.print_exc() + if self._data_source_update_sink is not None: + self._data_source_update_sink.update_status( + DataSourceState.INTERRUPTED, + DataSourceErrorInfo(DataSourceErrorKind.UNKNOWN, 0, time.time, str(e)) + ) def _load_file(self, path, all_data): content = None diff --git a/ldclient/integrations/__init__.py b/ldclient/integrations/__init__.py index de2b10f8..79735fe7 100644 --- a/ldclient/integrations/__init__.py +++ b/ldclient/integrations/__init__.py @@ -251,4 +251,4 @@ def new_data_source(paths: List[str], :return: an object (actually a lambda) to be stored in the ``update_processor_class`` configuration property """ - return lambda config, store, ready : _FileDataSource(store, ready, paths, auto_update, poll_interval, force_polling) + return lambda config, store, ready : _FileDataSource(store, config.data_source_update_sink, ready, paths, auto_update, poll_interval, force_polling) diff --git a/ldclient/interfaces.py b/ldclient/interfaces.py index c3b1f2f7..ba3595d2 100644 --- a/ldclient/interfaces.py +++ b/ldclient/interfaces.py @@ -7,6 +7,8 @@ from abc import ABCMeta, abstractmethod, abstractproperty from .versioned_data_kind import VersionedDataKind from typing import Any, Callable, Mapping, Optional +from enum import Enum + class FeatureStore: """ @@ -291,12 +293,12 @@ def get_metadata(self) -> BigSegmentStoreMetadata: def get_membership(self, context_hash: str) -> Optional[dict]: """ Queries the store for a snapshot of the current segment state for a specific context. - + The context_hash is a base64-encoded string produced by hashing the context key as defined by the Big Segments specification; the store implementation does not need to know the details of how this is done, because it deals only with already-hashed keys, but the string can be assumed to only contain characters that are valid in base64. - + The return value should be either a ``dict``, or None if the context is not referenced in any big segments. Each key in the dictionary is a "segment reference", which is how segments are identified in Big Segment data. This string is not identical to the segment key-- the SDK @@ -306,7 +308,7 @@ def get_membership(self, context_hash: str) -> Optional[dict]: explicitly included (that is, if both an include and an exclude existed in the data, the include would take precedence). If the context's status in a particular segment is undefined, there should be no key or value for that segment. - + This dictionary may be cached by the SDK, so it should not be modified after it is created. It is a snapshot of the segment membership state at one point in time. @@ -338,7 +340,7 @@ def available(self) -> bool: """ True if the Big Segment store is able to respond to queries, so that the SDK can evaluate whether a user is in a segment or not. - + If this property is False, the store is not able to make queries (for instance, it may not have a valid database connection). In this case, the SDK will treat any reference to a Big Segment as if no users are included in that segment. Also, the :func:`ldclient.evaluation.EvaluationDetail.reason` @@ -346,7 +348,7 @@ def available(self) -> bool: available will have a ``bigSegmentsStatus`` of ``"STORE_ERROR"``. """ return self.__available - + @property def stale(self) -> bool: """ @@ -365,19 +367,19 @@ def stale(self) -> bool: class BigSegmentStoreStatusProvider: """ An interface for querying the status of a Big Segment store. - + The Big Segment store is the component that receives information about Big Segments, normally from a database populated by the LaunchDarkly Relay Proxy. Big Segments are a specific type of user segments. For more information, read the LaunchDarkly documentation: https://docs.launchdarkly.com/home/users/big-segments - + An implementation of this abstract class is returned by :func:`ldclient.client.LDClient.big_segment_store_status_provider`. Application code never needs to implement this interface. - + There are two ways to interact with the status. One is to simply get the current status; if its ``available`` property is true, then the SDK is able to evaluate user membership in Big Segments, and the ``stale`` property indicates whether the data might be out of date. - + The other way is to subscribe to status change notifications. Applications may wish to know if there is an outage in the Big Segment store, or if it has become stale (the Relay Proxy has stopped updating it with new data), since then flag evaluations that reference a Big Segment @@ -414,3 +416,282 @@ def remove_listener(self, listener: Callable[[BigSegmentStoreStatus], None]) -> this method does nothing """ pass + + +class DataSourceState(Enum): + """ + Enumeration representing the states a data source can be in at any given time. + """ + + INITIALIZING = 'initializing' + """ + The initial state of the data source when the SDK is being initialized. + + If it encounters an error that requires it to retry initialization, the state will remain at + :class:`DataSourceState.INITIALIZING` until it either succeeds and becomes {VALID}, or permanently fails and + becomes {OFF}. + """ + + VALID = 'valid' + """ + Indicates that the data source is currently operational and has not had any problems since the + last time it received data. + + In streaming mode, this means that there is currently an open stream connection and that at least + one initial message has been received on the stream. In polling mode, it means that the last poll + request succeeded. + """ + + INTERRUPTED = 'interrupted' + """ + Indicates that the data source encountered an error that it will attempt to recover from. + + In streaming mode, this means that the stream connection failed, or had to be dropped due to some + other error, and will be retried after a backoff delay. In polling mode, it means that the last poll + request failed, and a new poll request will be made after the configured polling interval. + """ + + OFF = 'off' + """ + Indicates that the data source has been permanently shut down. + + This could be because it encountered an unrecoverable error (for instance, the LaunchDarkly service + rejected the SDK key; an invalid SDK key will never become valid), or because the SDK client was + explicitly shut down. + """ + + +class DataSourceErrorKind(Enum): + """ + Enumeration representing the types of errors a data source can encounter. + """ + + UNKNOWN = 'unknown' + """ + An unexpected error, such as an uncaught exception. + """ + + NETWORK_ERROR = 'network_error' + """ + An I/O error such as a dropped connection. + """ + + ERROR_RESPONSE = 'error_response' + """ + The LaunchDarkly service returned an HTTP response with an error status. + """ + + INVALID_DATA = 'invalid_data' + """ + The SDK received malformed data from the LaunchDarkly service. + """ + + STORE_ERROR = 'store_error' + """ + The data source itself is working, but when it tried to put an update into the data store, the data + store failed (so the SDK may not have the latest data). + + Data source implementations do not need to report this kind of error; it will be automatically + reported by the SDK when exceptions are detected. + """ + + +class DataSourceErrorInfo: + """ + A description of an error condition that the data source encountered. + """ + + def __init__(self, kind: DataSourceErrorKind, status_code: int, time: float, message: Optional[str]): + self.__kind = kind + self.__status_code = status_code + self.__time = time + self.__message = message + + @property + def kind(self) -> DataSourceErrorKind: + """ + :return: The general category of the error + """ + return self.__kind + + @property + def status_code(self) -> int: + """ + :return: An HTTP status or zero. + """ + return self.__status_code + + @property + def time(self) -> float: + """ + :return: Unix timestamp when the error occurred + """ + return self.__time + + @property + def message(self) -> Optional[str]: + """ + :return: Message an error message if applicable, or None + """ + return self.__message + + +class DataSourceStatus: + """ + Information about the data source's status and about the last status change. + """ + + def __init__(self, state: DataSourceState, state_since: float, last_error: Optional[DataSourceErrorInfo]): + self.__state = state + self.__state_since = state_since + self.__last_error = last_error + + @property + def state(self) -> DataSourceState: + """ + :return: The basic state of the data source. + """ + return self.__state + + @property + def since(self) -> float: + """ + :return: Unix timestamp of the last state transition. + """ + return self.__state_since + + @property + def error(self) -> Optional[DataSourceErrorInfo]: + """ + :return: A description of the last error, or None if there are no errors since startup + """ + return self.__last_error + + +class DataSourceStatusProvider: + """ + An interface for querying the status of the SDK's data source. The data source is the component + that receives updates to feature flag data; normally this is a streaming connection, but it + could be polling or file data depending on your configuration. + + An implementation of this interface is returned by + :func:`ldclient.client.LDClient.data_source_status_provider`. Application code never needs to + implement this interface. + """ + __metaclass__ = ABCMeta + + @abstractproperty + def status(self) -> DataSourceStatus: + """ + Returns the current status of the data source. + + All the built-in data source implementations are guaranteed to update this status whenever they + successfully initialize, encounter an error, or recover after an error. + + For a custom data source implementation, it is the responsibility of the data source to push + status updates to the SDK; if it does not do so, the status will always be reported as + :class:`DataSourceState.INITIALIZING`. + + :return: the status + """ + pass + + @abstractmethod + def add_listener(self, listener: Callable[[DataSourceStatus], None]): + """ + Subscribes for notifications of status changes. + + The listener is a function or method that will be called with a single parameter: the + new ``DataSourceStatus``. + + :param listener: the listener to add + """ + pass + + @abstractmethod + def remove_listener(self, listener: Callable[[DataSourceStatus], None]): + """ + Unsubscribes from notifications of status changes. + + :param listener: a listener that was previously added with :func:`add_listener()`; if it was not, + this method does nothing + """ + pass + + +class DataSourceUpdateSink: + """ + Interface that a data source implementation will use to push data into + the SDK. + + The data source interacts with this object, rather than manipulating + the data store directly, so that the SDK can perform any other + necessary operations that must happen when data is updated. + """ + __metaclass__ = ABCMeta + + @abstractmethod + def init(self, all_data: Mapping[VersionedDataKind, Mapping[str, dict]]): + """ + Initializes (or re-initializes) the store with the specified set of entities. Any + existing entries will be removed. Implementations can assume that this data set is up to + date-- there is no need to perform individual version comparisons between the existing + objects and the supplied features. + + If possible, the store should update the entire data set atomically. If that is not possible, + it should iterate through the outer hash and then the inner hash using the existing iteration + order of those hashes (the SDK will ensure that the items were inserted into the hashes in + the correct order), storing each item, and then delete any leftover items at the very end. + + :param all_data: All objects to be stored + """ + pass + + @abstractmethod + def upsert(self, kind: VersionedDataKind, item: dict): + """ + Attempt to add an entity, or update an existing entity with the same key. An update + should only succeed if the new item's version is greater than the old one; + otherwise, the method should do nothing. + + :param kind: The kind of object to update + :param item: The object to update or insert + """ + pass + + @abstractmethod + def delete(self, kind: VersionedDataKind, key: str, version: int): + """ + Attempt to delete an entity if it exists. Deletion should only succeed if the + version parameter is greater than the existing entity's version; otherwise, the + method should do nothing. + + :param kind: The kind of object to delete + :param key: The key of the object to be deleted + :param version: The version for the delete operation + """ + pass + + @abstractmethod + def update_status(self, new_state: DataSourceState, new_error: Optional[DataSourceErrorInfo]): + """ + Informs the SDK of a change in the data source's status. + + Data source implementations should use this method if they have any + concept of being in a valid state, a temporarily disconnected state, + or a permanently stopped state. + + If `new_state` is different from the previous state, and/or + `new_error` is non-null, the SDK will start returning the new status + (adding a timestamp for the change) from :class:`DataSourceStatusProvider.status`, and + will trigger status change events to any registered listeners. + + A special case is that if {new_state} is :class:`DataSourceState.INTERRUPTED`, but the + previous state was :class:`DataSourceState.INITIALIZING`, the state will remain at + :class:`DataSourceState.INITIALIZING` because :class:`DataSourceState.INTERRUPTED` is only meaningful + after a successful startup. + + :param new_state: The updated state of the data source + :param new_error: An optional error if the new state is an error condition + """ + pass diff --git a/testing/impl/datasource/test_polling_processor.py b/testing/impl/datasource/test_polling_processor.py index 068d1684..24076557 100644 --- a/testing/impl/datasource/test_polling_processor.py +++ b/testing/impl/datasource/test_polling_processor.py @@ -5,11 +5,15 @@ from ldclient.config import Config from ldclient.feature_store import InMemoryFeatureStore from ldclient.impl.datasource.polling import PollingUpdateProcessor +from ldclient.impl.datasource.status import DataSourceUpdateSinkImpl +from ldclient.impl.listeners import Listeners from ldclient.impl.util import UnsuccessfulResponseException +from ldclient.interfaces import DataSourceStatus, DataSourceState, DataSourceErrorKind from ldclient.versioned_data_kind import FEATURES, SEGMENTS from testing.builders import * from testing.stub_util import MockFeatureRequester, MockResponse +from testing.test_util import SpyListener pp = None mock_requester = None @@ -43,19 +47,28 @@ def test_successful_request_puts_feature_data_in_store(): "segkey": segment.to_json_dict() } } - setup_processor(Config("SDK_KEY")) + + spy = SpyListener() + listeners = Listeners() + listeners.add(spy) + + config = Config("SDK_KEY") + config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners) + setup_processor(config) ready.wait() assert store.get(FEATURES, "flagkey", lambda x: x) == flag assert store.get(SEGMENTS, "segkey", lambda x: x) == segment assert store.initialized assert pp.initialized() + assert len(spy.statuses) == 1 + assert spy.statuses[0].state == DataSourceState.VALID + assert spy.statuses[0].error is None # Note that we have to mock Config.poll_interval because Config won't let you set a value less than 30 seconds @mock.patch('ldclient.config.Config.poll_interval', new_callable=mock.PropertyMock, return_value=0.1) def test_general_connection_error_does_not_cause_immediate_failure(ignore_mock): mock_requester.exception = Exception("bad") - start_time = time.time() setup_processor(Config("SDK_KEY")) ready.wait(0.3) assert not pp.initialized() @@ -80,19 +93,45 @@ def test_http_503_error_does_not_cause_immediate_failure(): verify_recoverable_http_error(503) @mock.patch('ldclient.config.Config.poll_interval', new_callable=mock.PropertyMock, return_value=0.1) -def verify_unrecoverable_http_error(status, ignore_mock): - mock_requester.exception = UnsuccessfulResponseException(status) - setup_processor(Config("SDK_KEY")) +def verify_unrecoverable_http_error(http_status_code, ignore_mock): + spy = SpyListener() + listeners = Listeners() + listeners.add(spy) + + config = Config("SDK_KEY") + config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners) + + mock_requester.exception = UnsuccessfulResponseException(http_status_code) + setup_processor(config) finished = ready.wait(0.5) assert finished assert not pp.initialized() assert mock_requester.request_count == 1 + assert len(spy.statuses) == 1 + assert spy.statuses[0].state == DataSourceState.OFF + assert spy.statuses[0].error.kind == DataSourceErrorKind.ERROR_RESPONSE + assert spy.statuses[0].error.status_code == http_status_code + @mock.patch('ldclient.config.Config.poll_interval', new_callable=mock.PropertyMock, return_value=0.1) -def verify_recoverable_http_error(status, ignore_mock): - mock_requester.exception = UnsuccessfulResponseException(status) - setup_processor(Config("SDK_KEY")) +def verify_recoverable_http_error(http_status_code, ignore_mock): + spy = SpyListener() + listeners = Listeners() + listeners.add(spy) + + config = Config("SDK_KEY") + config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners) + + mock_requester.exception = UnsuccessfulResponseException(http_status_code) + setup_processor(config) finished = ready.wait(0.5) assert not finished assert not pp.initialized() assert mock_requester.request_count >= 2 + + assert len(spy.statuses) > 1 + + for status in spy.statuses: + assert status.state == DataSourceState.INITIALIZING + assert status.error.kind == DataSourceErrorKind.ERROR_RESPONSE + assert status.error.status_code == http_status_code diff --git a/testing/impl/datasource/test_streaming.py b/testing/impl/datasource/test_streaming.py index b017b9a8..1f52d73c 100644 --- a/testing/impl/datasource/test_streaming.py +++ b/testing/impl/datasource/test_streaming.py @@ -1,19 +1,23 @@ -import json import pytest from threading import Event +from typing import List import time from ldclient.config import Config from ldclient.feature_store import InMemoryFeatureStore from ldclient.impl.datasource.streaming import StreamingUpdateProcessor from ldclient.impl.events.diagnostics import _DiagnosticAccumulator +from ldclient.impl.listeners import Listeners from ldclient.version import VERSION from ldclient.versioned_data_kind import FEATURES, SEGMENTS +from ldclient.interfaces import DataSourceStatus, DataSourceState, DataSourceErrorKind +from ldclient.impl.datasource.status import DataSourceUpdateSinkImpl from testing.builders import * from testing.http_util import start_server, BasicResponse, CauseNetworkError, SequentialHandler from testing.proxy_test_util import do_proxy_tests -from testing.stub_util import make_delete_event, make_patch_event, make_put_event, stream_content +from testing.stub_util import make_delete_event, make_patch_event, make_put_event, make_invalid_put_event, stream_content +from testing.test_util import SpyListener brief_delay = 0.001 @@ -189,7 +193,7 @@ def test_retries_on_network_error(): server.for_path('/all', two_errors_then_success) with StreamingUpdateProcessor(config, store, ready, None) as sp: - sp.start() + sp.start() ready.wait(start_wait) assert sp.initialized() server.await_request @@ -207,7 +211,7 @@ def test_recoverable_http_error(status): server.for_path('/all', two_errors_then_success) with StreamingUpdateProcessor(config, store, ready, None) as sp: - sp.start() + sp.start() ready.wait(start_wait) assert sp.initialized() server.should_have_requests(3) @@ -224,7 +228,7 @@ def test_unrecoverable_http_error(status): server.for_path('/all', error_then_success) with StreamingUpdateProcessor(config, store, ready, None) as sp: - sp.start() + sp.start() ready.wait(5) assert not sp.initialized() server.should_have_requests(1) @@ -283,6 +287,108 @@ def test_records_diagnostic_on_stream_init_failure(): assert len(recorded_inits) == 2 assert recorded_inits[0]['failed'] is True assert recorded_inits[1]['failed'] is False +@pytest.mark.parametrize("status", [ 400, 408, 429, 500, 503 ]) +def test_status_includes_http_code(status): + error_handler = BasicResponse(status) + store = InMemoryFeatureStore() + ready = Event() + with start_server() as server: + with stream_content(make_put_event()) as stream: + two_errors_then_success = SequentialHandler(error_handler, error_handler, stream) + config = Config(sdk_key = 'sdk-key', stream_uri = server.uri, initial_reconnect_delay = brief_delay) + + spy = SpyListener() + listeners = Listeners() + listeners.add(spy) + + config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners) + server.for_path('/all', two_errors_then_success) + + with StreamingUpdateProcessor(config, store, ready, None) as sp: + sp.start() + ready.wait(start_wait) + assert sp.initialized() + server.should_have_requests(3) + + assert len(spy.statuses) == 3 + + assert spy.statuses[0].state == DataSourceState.INITIALIZING + assert spy.statuses[0].error.kind == DataSourceErrorKind.ERROR_RESPONSE + assert spy.statuses[0].error.status_code == status + + assert spy.statuses[1].state == DataSourceState.INITIALIZING + assert spy.statuses[1].error.kind == DataSourceErrorKind.ERROR_RESPONSE + assert spy.statuses[1].error.status_code == status + + assert spy.statuses[2].state == DataSourceState.VALID + assert spy.statuses[2].error.kind == DataSourceErrorKind.ERROR_RESPONSE + assert spy.statuses[2].error.status_code == status + + +def test_invalid_json_triggers_listener(): + store = InMemoryFeatureStore() + ready = Event() + with start_server() as server: + with stream_content(make_put_event()) as valid_stream, stream_content(make_invalid_put_event()) as invalid_stream: + config = Config(sdk_key = 'sdk-key', stream_uri = server.uri, initial_reconnect_delay = brief_delay) + + statuses: List[DataSourceStatus] = [] + listeners = Listeners() + + def listener(s): + if len(statuses) == 0: + invalid_stream.close() + statuses.append(s) + listeners.add(listener) + + config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners) + server.for_path('/all', SequentialHandler(invalid_stream, valid_stream)) + + with StreamingUpdateProcessor(config, store, ready, None) as sp: + sp.start() + ready.wait(start_wait) + assert sp.initialized() + server.should_have_requests(2) + + assert len(statuses) == 2 + + assert statuses[0].state == DataSourceState.INITIALIZING + assert statuses[0].error.kind == DataSourceErrorKind.INVALID_DATA + assert statuses[0].error.status_code == 0 + + assert statuses[1].state == DataSourceState.VALID + +def test_failure_transitions_from_valid(): + store = InMemoryFeatureStore() + ready = Event() + error_handler = BasicResponse(401) + with start_server() as server: + config = Config(sdk_key = 'sdk-key', stream_uri = server.uri, initial_reconnect_delay = brief_delay) + + spy = SpyListener() + listeners = Listeners() + listeners.add(spy) + + config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners) + + # The sink has special handling for failures before the state is valid. So we manually set this to valid so we + # can exercise the other branching logic within the sink. + config.data_source_update_sink.update_status(DataSourceState.VALID, None) + server.for_path('/all', error_handler) + + with StreamingUpdateProcessor(config, store, ready, None) as sp: + sp.start() + ready.wait(start_wait) + server.should_have_requests(1) + + assert len(spy.statuses) == 2 + + assert spy.statuses[0].state == DataSourceState.VALID + + assert spy.statuses[1].state == DataSourceState.OFF + assert spy.statuses[1].error.kind == DataSourceErrorKind.ERROR_RESPONSE + assert spy.statuses[1].error.status_code == 401 + def expect_item(store, kind, item): assert store.get(kind, item['key'], lambda x: x) == item diff --git a/testing/stub_util.py b/testing/stub_util.py index 8bddcdad..2a94d9e8 100644 --- a/testing/stub_util.py +++ b/testing/stub_util.py @@ -20,6 +20,9 @@ def make_put_event(flags = [], segments = []): data = { "data": { "flags": make_items_map(flags), "segments": make_items_map(segments) } } return 'event:put\ndata: %s\n\n' % json.dumps(data) +def make_invalid_put_event(): + return 'event:put\ndata: {"data": {\n\n' + def make_patch_event(kind, item): path = '%s%s' % (kind.stream_api_path, item['key']) data = { "path": path, "data": item_as_json(item) } diff --git a/testing/test_file_data_source.py b/testing/test_file_data_source.py index 0ff3b0d6..74789450 100644 --- a/testing/test_file_data_source.py +++ b/testing/test_file_data_source.py @@ -1,5 +1,7 @@ import json import os +from typing import List + import pytest import tempfile import threading @@ -8,9 +10,14 @@ from ldclient.client import LDClient, Context from ldclient.config import Config from ldclient.feature_store import InMemoryFeatureStore +from ldclient.impl.datasource.status import DataSourceUpdateSinkImpl +from ldclient.impl.listeners import Listeners from ldclient.integrations import Files +from ldclient.interfaces import DataSourceStatus, DataSourceState, DataSourceErrorKind from ldclient.versioned_data_kind import FEATURES, SEGMENTS +from testing.test_util import SpyListener + have_yaml = False try: import yaml @@ -98,9 +105,9 @@ def teardown_function(): if data_source is not None: data_source.stop() -def make_data_source(**kwargs): +def make_data_source(config, **kwargs): global data_source - data_source = Files.new_data_source(**kwargs)(Config("SDK_KEY"), store, ready) + data_source = Files.new_data_source(**kwargs)(config, store, ready) return data_source def make_temp_file(content): @@ -116,7 +123,7 @@ def replace_file(path, content): def test_does_not_load_data_prior_to_start(): path = make_temp_file('{"flagValues":{"key":"value"}}') try: - source = make_data_source(paths = path) + source = make_data_source(Config("SDK_KEY"), paths = path) assert ready.is_set() is False assert source.initialized() is False assert store.initialized is False @@ -125,11 +132,40 @@ def test_does_not_load_data_prior_to_start(): def test_loads_flags_on_start_from_json(): path = make_temp_file(all_properties_json) + spy = SpyListener() + listeners = Listeners() + listeners.add(spy) + try: - source = make_data_source(paths = path) + config = Config("SDK_KEY") + config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners) + source = make_data_source(config, paths = path) source.start() assert store.initialized is True assert sorted(list(store.all(FEATURES, lambda x: x).keys())) == all_flag_keys + + assert len(spy.statuses) == 1 + assert spy.statuses[0].state == DataSourceState.VALID + assert spy.statuses[0].error is None + finally: + os.remove(path) + +def test_handles_invalid_format_correctly(): + path = make_temp_file('{"flagValues":{') + spy = SpyListener() + listeners = Listeners() + listeners.add(spy) + + try: + config = Config("SDK_KEY") + config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners) + source = make_data_source(config, paths = path) + source.start() + assert store.initialized is False + + assert len(spy.statuses) == 1 + assert spy.statuses[0].state == DataSourceState.INITIALIZING + assert spy.statuses[0].error.kind == DataSourceErrorKind.INVALID_DATA finally: os.remove(path) @@ -138,7 +174,7 @@ def test_loads_flags_on_start_from_yaml(): pytest.skip("skipping file source test with YAML because pyyaml isn't available") path = make_temp_file(all_properties_yaml) try: - source = make_data_source(paths = path) + source = make_data_source(Config("SDK_KEY"), paths = path) source.start() assert store.initialized is True assert sorted(list(store.all(FEATURES, lambda x: x).keys())) == all_flag_keys @@ -148,7 +184,7 @@ def test_loads_flags_on_start_from_yaml(): def test_sets_ready_event_and_initialized_on_successful_load(): path = make_temp_file(all_properties_json) try: - source = make_data_source(paths = path) + source = make_data_source(Config("SDK_KEY"), paths = path) source.start() assert source.initialized() is True assert ready.is_set() is True @@ -157,7 +193,7 @@ def test_sets_ready_event_and_initialized_on_successful_load(): def test_sets_ready_event_and_does_not_set_initialized_on_unsuccessful_load(): bad_file_path = 'no-such-file' - source = make_data_source(paths = bad_file_path) + source = make_data_source(Config("SDK_KEY"), paths = bad_file_path) source.start() assert source.initialized() is False assert ready.is_set() is True @@ -166,7 +202,7 @@ def test_can_load_multiple_files(): path1 = make_temp_file(flag_only_json) path2 = make_temp_file(segment_only_json) try: - source = make_data_source(paths = [ path1, path2 ]) + source = make_data_source(Config("SDK_KEY"), paths = [ path1, path2 ]) source.start() assert len(store.all(FEATURES, lambda x: x)) == 1 assert len(store.all(SEGMENTS, lambda x: x)) == 1 @@ -178,7 +214,7 @@ def test_does_not_allow_duplicate_keys(): path1 = make_temp_file(flag_only_json) path2 = make_temp_file(flag_only_json) try: - source = make_data_source(paths = [ path1, path2 ]) + source = make_data_source(Config("SDK_KEY"), paths = [ path1, path2 ]) source.start() assert len(store.all(FEATURES, lambda x: x)) == 0 finally: @@ -188,7 +224,7 @@ def test_does_not_allow_duplicate_keys(): def test_does_not_reload_modified_file_if_auto_update_is_off(): path = make_temp_file(flag_only_json) try: - source = make_data_source(paths = path) + source = make_data_source(Config("SDK_KEY"), paths = path) source.start() assert len(store.all(SEGMENTS, lambda x: x)) == 0 time.sleep(0.5) @@ -202,7 +238,7 @@ def do_auto_update_test(options): path = make_temp_file(flag_only_json) options['paths'] = path try: - source = make_data_source(**options) + source = make_data_source(Config("SDK_KEY"), **options) source.start() assert len(store.all(SEGMENTS, lambda x: x)) == 0 time.sleep(0.5) diff --git a/testing/test_ldclient_listeners.py b/testing/test_ldclient_listeners.py index b160135e..2a7798b7 100644 --- a/testing/test_ldclient_listeners.py +++ b/testing/test_ldclient_listeners.py @@ -1,10 +1,13 @@ from ldclient.client import LDClient, Config +from ldclient.interfaces import DataSourceState from ldclient.config import BigSegmentsConfig from testing.mock_components import MockBigSegmentStore -from testing.stub_util import MockEventProcessor, MockUpdateProcessor +from testing.stub_util import MockEventProcessor, MockUpdateProcessor, make_put_event, stream_content +from testing.http_util import start_server from queue import Queue + def test_big_segment_store_status_unavailable(): config=Config( sdk_key='SDK_KEY', @@ -45,3 +48,23 @@ def test_big_segment_store_status_updates(): assert status3.stale == False assert client.big_segment_store_status_provider.status.available == True + +def test_data_source_status_default(): + config=Config( + sdk_key='SDK_KEY', + event_processor_class=MockEventProcessor, + update_processor_class=MockUpdateProcessor + ) + client = LDClient(config) + assert client.data_source_status_provider.status.state == DataSourceState.INITIALIZING + + +def test_data_source_status_updates(): + with start_server() as stream_server: + with stream_content(make_put_event()) as stream_handler: + stream_server.for_path('/all', stream_handler) + config = Config(sdk_key='sdk-key', stream_uri=stream_server.uri, send_events=False) + + with LDClient(config=config) as client: + assert client.data_source_status_provider.status.state == DataSourceState.VALID + assert client.data_source_status_provider.status.error is None diff --git a/testing/test_util.py b/testing/test_util.py index 5329d018..82ec5667 100644 --- a/testing/test_util.py +++ b/testing/test_util.py @@ -14,3 +14,15 @@ def test_can_redact_password(password_redaction_tests): input, expected = password_redaction_tests assert redact_password(input) == expected + + +class SpyListener: + def __init__(self): + self._statuses = [] + + def __call__(self, status): + self._statuses.append(status) + + @property + def statuses(self): + return self._statuses