diff --git a/CHANGELOG.md b/CHANGELOG.md index cc9c77f5..e81931a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,13 @@ All notable changes to the LaunchDarkly Python SDK will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org). +## [3.0.0] - 2016-08-22 +### Added +- Twisted support for LDD mode only. + +### Changed +- FeatureStore interface get() and all() methods now take an additional callback parameter. + ## [2.0.0] - 2016-08-10 ### Added - Support for multivariate feature flags. `variation` replaces `toggle` and can return a string, number, dict, or boolean value depending on how the flag is defined. diff --git a/MANIFEST.in b/MANIFEST.in index d3e3bd98..94847dd4 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,5 @@ include requirements.txt include README.txt include test-requirements.txt +include twisted-requirements.txt include redis-requirements.txt \ No newline at end of file diff --git a/README.md b/README.md index daa689d2..53073479 100644 --- a/README.md +++ b/README.md @@ -26,13 +26,38 @@ Your first feature flag ----------------------- 1. Create a new feature flag on your [dashboard](https://app.launchdarkly.com) -2. In your application code, use the feature's key to check wthether the flag is on for each user: +2. In your application code, use the feature's key to check whether the flag is on for each user: if client.variation("your.flag.key", {"key": "user@test.com"}, False): # application code to show the feature else: # the code to run if the feature is off +Twisted +------- +Twisted is supported for LDD mode only. To run in Twisted/LDD mode, + +1. Use this dependency: + + ``` + ldclient-py[twisted]==3.0.0 + ``` +2. Configure the client: + + ``` + feature_store = TwistedRedisFeatureStore(url='YOUR_REDIS_URL', redis_prefix="ldd-restwrapper", expiration=0) + ldclient.config.feature_store = feature_store + + ldclient.config = ldclient.Config( + use_ldd=use_ldd, + event_consumer_class=TwistedEventConsumer, + ) + ldclient.sdk_key = 'YOUR_SDK_KEY' + ``` +3. Get the client: + + ```client = ldclient.get()``` + Learn more ----------- diff --git a/ldclient/client.py b/ldclient/client.py index 5c119ebb..3aab4d7e 100644 --- a/ldclient/client.py +++ b/ldclient/client.py @@ -120,10 +120,7 @@ def __init__(self, sdk_key, config=None, start_wait=5): self._event_consumer.start() if self._config.use_ldd: - if self._store.__class__ == "RedisFeatureStore": - log.info("Started LaunchDarkly Client in LDD mode") - return - log.error("LDD mode requires a RedisFeatureStore.") + log.info("Started LaunchDarkly Client in LDD mode") return if self._config.feature_requester_class: @@ -136,6 +133,7 @@ def __init__(self, sdk_key, config=None, start_wait=5): update_processor_ready = threading.Event() if self._config.update_processor_class: + log.info("Using user-specified update processor: " + str(self._config.update_processor_class)) self._update_processor = self._config.update_processor_class( sdk_key, self._config, self._feature_requester, self._store, update_processor_ready) else: @@ -230,23 +228,35 @@ def send_event(value, version=None): if user.get('key', "") == "": log.warn("User key is blank. Flag evaluation will proceed, but the user will not be stored in LaunchDarkly.") - flag = self._store.get(key) - if not flag: - log.warn("Feature Flag key: " + key + " not found in Feature Store. Returning default.") - send_event(default) + def cb(flag): + try: + if not flag: + log.warn("Feature Flag key: " + key + " not found in Feature Store. Returning default.") + send_event(default) + return default + + return self._evaluate_and_send_events(flag, user, default) + + except Exception as e: + log.error("Exception caught in variation: " + e.message + " for flag key: " + key + " and user: " + str(user)) + return default - value, events = evaluate(flag, user, self._store) + return self._store.get(key, cb) + + def _evaluate(self, flag, user): + return evaluate(flag, user, self._store) + + def _evaluate_and_send_events(self, flag, user, default): + value, events = self._evaluate(flag, user) for event in events or []: self._send_event(event) - log.debug("Sending event: " + str(event)) - - if value is not None: - send_event(value, flag.get('version')) - return value - send_event(default, flag.get('version')) - return default + if value is None: + value = default + self._send_event({'kind': 'feature', 'key': flag.get('key'), + 'user': user, 'value': value, 'default': default, 'version': flag.get('version')}) + return value def all_flags(self, user): if self._config.offline: @@ -261,7 +271,17 @@ def all_flags(self, user): log.warn("User or user key is None when calling all_flags(). Returning None.") return None - return {k: evaluate(v, user, self._store)[0] for k, v in self._store.all().items() or {}} + def cb(all_flags): + try: + return self._evaluate_multi(user, all_flags) + except Exception as e: + log.error("Exception caught in all_flags: " + e.message + " for user: " + str(user)) + return {} + + return self._store.all(cb) + + def _evaluate_multi(self, user, flags): + return {k: self._evaluate(v, user)[0] for k, v in flags.items() or {}} def secure_mode_hash(self, user): if user.get('key') is None: diff --git a/ldclient/feature_store.py b/ldclient/feature_store.py index f24335d2..e5a0f237 100644 --- a/ldclient/feature_store.py +++ b/ldclient/feature_store.py @@ -10,24 +10,24 @@ def __init__(self): self._initialized = False self._features = {} - def get(self, key): + def get(self, key, callback): try: self._lock.rlock() f = self._features.get(key) if f is None: log.debug("Attempted to get missing feature: " + str(key) + " Returning None") - return None + return callback(None) if 'deleted' in f and f['deleted']: log.debug("Attempted to get deleted feature: " + str(key) + " Returning None") - return None - return f + return callback(None) + return callback(f) finally: self._lock.runlock() - def all(self): + def all(self, callback): try: self._lock.rlock() - return dict((k, f) for k, f in self._features.items() if ('deleted' not in f) or not f['deleted']) + return callback(dict((k, f) for k, f in self._features.items() if ('deleted' not in f) or not f['deleted'])) finally: self._lock.runlock() diff --git a/ldclient/flag.py b/ldclient/flag.py index faa117ff..34211c8e 100644 --- a/ldclient/flag.py +++ b/ldclient/flag.py @@ -21,10 +21,7 @@ def evaluate(flag, user, store): if value is not None: return value, prereq_events - if 'offVariation' in flag and flag['offVariation']: - value = _get_variation(flag, flag['offVariation']) - return value, prereq_events - return None, prereq_events + return _get_off_variation(flag), prereq_events def _evaluate(flag, user, store, prereq_events=None): @@ -32,7 +29,7 @@ def _evaluate(flag, user, store, prereq_events=None): failed_prereq = None prereq_value = None for prereq in flag.get('prerequisites') or []: - prereq_flag = store.get(prereq.get('key')) + prereq_flag = store.get(prereq.get('key'), lambda x: x) if prereq_flag is None: log.warn("Missing prereq flag: " + prereq.get('key')) failed_prereq = prereq diff --git a/ldclient/interfaces.py b/ldclient/interfaces.py index d6504503..80ae7a8c 100644 --- a/ldclient/interfaces.py +++ b/ldclient/interfaces.py @@ -8,22 +8,23 @@ class FeatureStore(object): __metaclass__ = ABCMeta @abstractmethod - def get(self, key): + def get(self, key, callback): """ - Gets the data for a feature flag for evaluation - - :param key: The feature flag key + Gets a feature and calls the callback with the feature data to return the result + :param key: The feature key :type key: str - :return: The feature flag data - :rtype: dict + :param callback: The function that accepts the feature data and returns the feature value + :type callback: Function that processes the feature flag once received. + :return: The result of executing callback. """ @abstractmethod - def all(self): + def all(self, callback): """ Returns all feature flags and their data - - :rtype: dict[str, dict] + :param callback: The function that accepts the feature data and returns the feature value + :type callback: Function that processes the feature flags once received. + :rtype: The result of executing callback. """ @abstractmethod diff --git a/ldclient/redis_feature_store.py b/ldclient/redis_feature_store.py index 426fd977..7be129e3 100644 --- a/ldclient/redis_feature_store.py +++ b/ldclient/redis_feature_store.py @@ -40,43 +40,44 @@ def init(self, features): pipe.hset(self._features_key, k, f_json) self._cache[k] = f pipe.execute() + log.info("Initialized RedisFeatureStore with " + str(len(features)) + " feature flags") - def all(self): + def all(self, callback): r = redis.Redis(connection_pool=self._pool) all_features = r.hgetall(self._features_key) if all_features is None or all_features is "": log.warn("RedisFeatureStore: call to get all flags returned no results. Returning None.") - return None + return callback(None) results = {} for k, f_json in all_features.items() or {}: f = json.loads(f_json.decode('utf-8')) if 'deleted' in f and f['deleted'] is False: results[f['key']] = f - return results + return callback(results) - def get(self, key): + def get(self, key, callback=lambda x: x): f = self._cache.get(key) if f is not None: # reset ttl self._cache[key] = f if f.get('deleted', False) is True: log.warn("RedisFeatureStore: get returned deleted flag from in-memory cache. Returning None.") - return None - return f + return callback(None) + return callback(f) r = redis.Redis(connection_pool=self._pool) f_json = r.hget(self._features_key, key) if f_json is None or f_json is "": log.warn("RedisFeatureStore: feature flag with key: " + key + " not found in Redis. Returning None.") - return None + return callback(None) f = json.loads(f_json.decode('utf-8')) if f.get('deleted', False) is True: log.warn("RedisFeatureStore: get returned deleted flag from Redis. Returning None.") - return None + return callback(None) self._cache[key] = f - return f + return callback(f) def delete(self, key, version): r = redis.Redis(connection_pool=self._pool) diff --git a/ldclient/streaming.py b/ldclient/streaming.py index 265b425c..bbf65d16 100644 --- a/ldclient/streaming.py +++ b/ldclient/streaming.py @@ -9,7 +9,6 @@ class StreamingUpdateProcessor(Thread, UpdateProcessor): - def __init__(self, sdk_key, config, requester, store, ready): Thread.__init__(self) self.daemon = True @@ -31,7 +30,8 @@ def run(self): for msg in messages: if not self._running: break - self.process_message(self._store, self._requester, msg, self._ready) + if self.process_message(self._store, self._requester, msg, self._ready) is True: + self._ready.set() except Exception as e: log.error("Could not connect to LaunchDarkly stream: " + str(e.message) + " waiting 1 second before trying again.") @@ -51,8 +51,8 @@ def process_message(store, requester, msg, ready): if msg.event == 'put': store.init(payload) if not ready.is_set() and store.initialized: - ready.set() log.info("StreamingUpdateProcessor initialized ok") + return True elif msg.event == 'patch': key = payload['path'][1:] feature = payload['data'] @@ -64,12 +64,13 @@ def process_message(store, requester, msg, ready): elif msg.event == "indirect/put": store.init(requester.get_all()) if not ready.is_set() and store.initialized: - ready.set() log.info("StreamingUpdateProcessor initialized ok") + return True elif msg.event == 'delete': key = payload['path'][1:] # noinspection PyShadowingNames version = payload['version'] store.delete(key, version) else: - log.warning('Unhandled event in stream processor: ' + msg.event) \ No newline at end of file + log.warning('Unhandled event in stream processor: ' + msg.event) + return False diff --git a/ldclient/twisted_client.py b/ldclient/twisted_client.py new file mode 100644 index 00000000..90ce50dc --- /dev/null +++ b/ldclient/twisted_client.py @@ -0,0 +1,80 @@ +from functools import partial + +from twisted.internet import defer +from twisted.internet.defer import DeferredList + +from ldclient import LDClient +from ldclient import log +from ldclient.flag import _get_variation, _evaluate_index, _get_off_variation + + +class TwistedLDClient(LDClient): + @defer.inlineCallbacks + def _evaluate_and_send_events(self, flag, user, default): + value = yield self._evaluate(flag, user) + if value is None: + value = default + log.info("value: " + str(value)) + self._send_event({'kind': 'feature', 'key': flag.get('key'), 'user': user, 'value': value, + 'default': default, 'version': flag.get('version')}) + defer.returnValue(value) + + def _evaluate(self, flag, user): + if flag.get('on', False): + def cb(result): + if result is not None: + return result + return _get_off_variation(flag) + + value = self._evaluate_internal(flag, user) + value.addBoth(cb) + return value + + return _get_off_variation(flag) + + def _evaluate_internal(self, flag, user): + def check_prereq_results(result): + prereq_ok = True + for (success, prereq_ok) in result: + if success is False or prereq_ok is False: + prereq_ok = False + + if prereq_ok is True: + index = _evaluate_index(flag, user) + variation = _get_variation(flag, index) + return variation + return None + + results = DeferredList(map(partial(self._evaluate_prereq, user), flag.get('prerequisites') or [])) + results.addBoth(check_prereq_results) + return results + + # returns False if the prereq failed or there was an error evaluating it. Otherwise returns True + def _evaluate_prereq(self, user, prereq): + + @defer.inlineCallbacks + def eval_prereq(prereq_flag): + if prereq_flag is None: + log.warn("Missing prereq flag: " + prereq.get('key')) + defer.returnValue(False) + if prereq_flag.get('on', False) is True: + prereq_value = yield self._evaluate_internal(prereq_flag, user) + variation = _get_variation(prereq_flag, prereq.get('variation')) + if prereq_value is None or not prereq_value == variation: + ok = False + else: + ok = True + else: + ok = False + defer.returnValue(ok) + + result = self._store.get(prereq.get('key'), eval_prereq) + return result + + @defer.inlineCallbacks + def _evaluate_multi(self, user, flags): + results = {} + for k, v in flags.items() or {}: + r = yield self._evaluate(v, user) + results[k] = r + defer.returnValue(results) diff --git a/ldclient/twisted_event_consumer.py b/ldclient/twisted_event_consumer.py new file mode 100644 index 00000000..18b444d6 --- /dev/null +++ b/ldclient/twisted_event_consumer.py @@ -0,0 +1,88 @@ +from __future__ import absolute_import + +import errno +import json + +import txrequests +from cachecontrol import CacheControl +from queue import Empty +from requests.packages.urllib3.exceptions import ProtocolError +from twisted.internet import task, defer + +from ldclient.interfaces import EventConsumer +from ldclient.util import _headers, log + + +class TwistedEventConsumer(EventConsumer): + + def __init__(self, queue, sdk_key, config): + self._queue = queue + """ :type: queue.Queue """ + + self._session = CacheControl(txrequests.Session()) + """ :type: txrequests.Session """ + + self._sdk_key = sdk_key + self._config = config + """ :type: ldclient.twisted.TwistedConfig """ + + self._looping_call = None + """ :type: LoopingCall""" + + def start(self): + self._looping_call = task.LoopingCall(self._consume) + self._looping_call.start(5) + + def stop(self): + self._looping_call.stop() + + def is_alive(self): + return self._looping_call is not None and self._looping_call.running + + def flush(self): + return self._consume() + + def _consume(self): + items = [] + try: + while True: + items.append(self._queue.get_nowait()) + except Empty: + pass + + if items: + return self.send_batch(items) + + @defer.inlineCallbacks + def send_batch(self, events): + @defer.inlineCallbacks + def do_send(should_retry): + # noinspection PyBroadException + try: + if isinstance(events, dict): + body = [events] + else: + body = events + hdrs = _headers(self._sdk_key) + r = yield self._session.post(self._config.events_uri, + headers=hdrs, + timeout=(self._config.connect_timeout, self._config.read_timeout), + data=json.dumps(body)) + r.raise_for_status() + except ProtocolError as e: + inner = e.args[1] + if inner.errno == errno.ECONNRESET and should_retry: + log.warning( + 'ProtocolError exception caught while sending events. Retrying.') + yield do_send(False) + else: + log.exception( + 'Unhandled exception in event consumer. Analytics events were not processed.') + except: + log.exception( + 'Unhandled exception in event consumer. Analytics events were not processed.') + try: + yield do_send(True) + finally: + for _ in events: + self._queue.task_done() diff --git a/ldclient/twisted_redis_feature_store.py b/ldclient/twisted_redis_feature_store.py new file mode 100644 index 00000000..2307a335 --- /dev/null +++ b/ldclient/twisted_redis_feature_store.py @@ -0,0 +1,133 @@ +from __future__ import absolute_import + +import json +import urlparse + +from twisted.internet import defer +from twisted.internet import protocol, reactor +from txredis.client import RedisClient + +from ldclient.expiringdict import ExpiringDict +from ldclient.interfaces import FeatureStore +from ldclient.redis_feature_store import ForgetfulDict, INIT_KEY +from ldclient.util import log + + +class TwistedRedisFeatureStore(FeatureStore): + def __init__(self, + url='redis://localhost:6379/0', + expiration=15, + capacity=1000, + redis_prefix='launchdarkly'): + self._url = url + parsed_url = urlparse.urlparse(url) + self._redis_host = parsed_url.hostname + self._redis_port = parsed_url.port + self._features_key = "{}:features".format(redis_prefix) + self._cache = ForgetfulDict() if expiration == 0 else ExpiringDict(max_len=capacity, + max_age_seconds=expiration) + log.info("Created TwistedRedisFeatureStore with url: " + url + " using key: " + self._features_key) + + def _get_connection(self): + client_creator = protocol.ClientCreator(reactor, RedisClient) + return client_creator.connectTCP(self._redis_host, self._redis_port) + + def initialized(self): + initialized = self._cache.get(INIT_KEY) + if initialized: + # reset ttl + self._cache[INIT_KEY] = True + return True + + @defer.inlineCallbacks + def redis_initialized(): + r = yield self._get_connection() + """ :type: RedisClient """ + i = yield r.exists(self._features_key) + if i: + # reset ttl + self._cache[INIT_KEY] = True + defer.returnValue(i) + + initialized = redis_initialized() + return initialized + + def upsert(self, key, feature): + raise NotImplementedError() + + def all(self, callback): + @defer.inlineCallbacks + def redis_get_all(): + r = None + try: + r = yield self._get_connection() + """ :type: RedisClient """ + all_features = yield r.hgetall(self._features_key) + if all_features is None or all_features is "": + log.warn("TwistedRedisFeatureStore: call to get all flags returned no results. Returning None.") + defer.returnValue(None) + + results = {} + for k, f_json in all_features.items() or {}: + f = json.loads(f_json.decode('utf-8')) + if 'deleted' in f and f['deleted'] is False: + results[f['key']] = f + defer.returnValue(results) + except Exception as e: + log.error("Could not connect to Redis using url: " + self._url + " with error message: " + e.message) + defer.returnValue(None) + finally: + if r: + r.quit() + defer.returnValue(None) + + all_flags = redis_get_all() + all_flags.addBoth(callback) + return all_flags + + def delete(self, key, version): + raise NotImplementedError() + + def init(self, features): + raise NotImplementedError() + + def get(self, key, callback): + @defer.inlineCallbacks + def redis_get(): + r = None + try: + r = yield self._get_connection() + """ :type: RedisClient """ + get_result = yield r.hget(self._features_key, key) + if not get_result: + log.warn("Didn't get response from redis for key: " + key + " Returning None.") + defer.returnValue(None) + f_json = get_result.get(key) + if f_json is None or f_json is "": + log.warn( + "TwistedRedisFeatureStore: feature flag with key: " + key + " not found in Redis. Returning None.") + defer.returnValue(None) + + f = json.loads(f_json.decode('utf-8')) + if f.get('deleted', False) is True: + log.warn("TwistedRedisFeatureStore: get returned deleted flag from Redis. Returning None.") + defer.returnValue(None) + self._cache[key] = f + defer.returnValue(f) + except Exception as e: + log.error("Could not connect to Redis using url: " + self._url + " with error message: " + e.message) + defer.returnValue(None) + finally: + if r: + r.quit() + defer.returnValue(None) + + cached = self._cache.get(key) + if cached is not None: + # reset ttl + self._cache[key] = cached + return callback(cached) + + f = redis_get() + f.addBoth(callback) + return f diff --git a/ldclient/version.py b/ldclient/version.py index 21014090..ea9d6945 100644 --- a/ldclient/version.py +++ b/ldclient/version.py @@ -1 +1 @@ -VERSION = "2.0.0" +VERSION = "3.0.0" diff --git a/setup.py b/setup.py index 25f530b4..4ad357b5 100644 --- a/setup.py +++ b/setup.py @@ -10,12 +10,15 @@ # parse_requirements() returns generator of pip.req.InstallRequirement objects install_reqs = parse_requirements('requirements.txt', session=uuid.uuid1()) test_reqs = parse_requirements('test-requirements.txt', session=uuid.uuid1()) +twisted_reqs = parse_requirements( + 'twisted-requirements.txt', session=uuid.uuid1()) redis_reqs = parse_requirements('redis-requirements.txt', session=uuid.uuid1()) # reqs is a list of requirement # e.g. ['django==1.5.1', 'mezzanine==1.4.6'] reqs = [str(ir.req) for ir in install_reqs] testreqs = [str(ir.req) for ir in test_reqs] +txreqs = [str(ir.req) for ir in twisted_reqs] redisreqs = [str(ir.req) for ir in redis_reqs] @@ -36,7 +39,7 @@ def run(self): setup( name='ldclient-py', - version='2.0.0', + version='3.0.0', author='Catamorphic Co.', author_email='team@catamorphic.com', packages=['ldclient'], @@ -50,6 +53,7 @@ def run(self): 'Programming Language :: Python :: 2 :: Only', ], extras_require={ + "twisted": txreqs, "redis": redisreqs }, tests_require=testreqs, diff --git a/testing/test_feature_store.py b/testing/test_feature_store.py index 96bb140c..ef458986 100644 --- a/testing/test_feature_store.py +++ b/testing/test_feature_store.py @@ -66,43 +66,43 @@ def test_initialized(self, store): def test_get_existing_feature(self, store): store = self.base_initialized_store(store) expected = self.make_feature('foo', 10) - assert store.get('foo') == expected + assert store.get('foo', lambda x: x) == expected def test_get_nonexisting_feature(self, store): store = self.base_initialized_store(store) - assert store.get('biz') is None + assert store.get('biz', lambda x: x) is None def test_upsert_with_newer_version(self, store): store = self.base_initialized_store(store) new_ver = self.make_feature('foo', 11) store.upsert('foo', new_ver) - assert store.get('foo') == new_ver + assert store.get('foo', lambda x: x) == new_ver def test_upsert_with_older_version(self, store): store = self.base_initialized_store(store) new_ver = self.make_feature('foo', 9) expected = self.make_feature('foo', 10) store.upsert('foo', new_ver) - assert store.get('foo') == expected + assert store.get('foo', lambda x: x) == expected def test_upsert_with_new_feature(self, store): store = self.base_initialized_store(store) new_ver = self.make_feature('biz', 1) store.upsert('biz', new_ver) - assert store.get('biz') == new_ver + assert store.get('biz', lambda x: x) == new_ver def test_delete_with_newer_version(self, store): store = self.base_initialized_store(store) store.delete('foo', 11) - assert store.get('foo') is None + assert store.get('foo', lambda x: x) is None def test_delete_unknown_feature(self, store): store = self.base_initialized_store(store) store.delete('biz', 11) - assert store.get('biz') is None + assert store.get('biz', lambda x: x) is None def test_delete_with_older_version(self, store): store = self.base_initialized_store(store) store.delete('foo', 9) expected = self.make_feature('foo', 10) - assert store.get('foo') == expected + assert store.get('foo', lambda x: x) == expected diff --git a/twisted-requirements.txt b/twisted-requirements.txt new file mode 100644 index 00000000..e99d9e35 --- /dev/null +++ b/twisted-requirements.txt @@ -0,0 +1,5 @@ +cryptography>=1.0 +pyOpenSSL>=0.14 +service_identity>=16.0 +txredis>=2.4 +txrequests>=0.9.2