From de7d3b0e2c483c9b6db4a9cbb4865e7eac46fd1a Mon Sep 17 00:00:00 2001 From: Dan Richelson Date: Wed, 10 Aug 2016 11:27:16 -0700 Subject: [PATCH 1/9] [wip] twisted redis feature store --- ldclient/client.py | 1 + ldclient/redis_feature_store.py | 1 + ldclient/streaming.py | 11 +- ldclient/twisted_impls.py | 23 ++-- ldclient/twisted_redis_feature_store.py | 133 ++++++++++++++++++++++++ twisted-requirements.txt | 10 +- 6 files changed, 164 insertions(+), 15 deletions(-) create mode 100644 ldclient/twisted_redis_feature_store.py diff --git a/ldclient/client.py b/ldclient/client.py index 5c119ebb..a8c9b08f 100644 --- a/ldclient/client.py +++ b/ldclient/client.py @@ -136,6 +136,7 @@ def __init__(self, sdk_key, config=None, start_wait=5): update_processor_ready = threading.Event() if self._config.update_processor_class: + log.info("Using user-specified update processor: " + str(self._config.update_processor_class)) self._update_processor = self._config.update_processor_class( sdk_key, self._config, self._feature_requester, self._store, update_processor_ready) else: diff --git a/ldclient/redis_feature_store.py b/ldclient/redis_feature_store.py index 426fd977..5e979669 100644 --- a/ldclient/redis_feature_store.py +++ b/ldclient/redis_feature_store.py @@ -40,6 +40,7 @@ def init(self, features): pipe.hset(self._features_key, k, f_json) self._cache[k] = f pipe.execute() + log.info("Initialized RedisFeatureStore with " + str(len(features)) + " feature flags") def all(self): r = redis.Redis(connection_pool=self._pool) diff --git a/ldclient/streaming.py b/ldclient/streaming.py index 265b425c..bbf65d16 100644 --- a/ldclient/streaming.py +++ b/ldclient/streaming.py @@ -9,7 +9,6 @@ class StreamingUpdateProcessor(Thread, UpdateProcessor): - def __init__(self, sdk_key, config, requester, store, ready): Thread.__init__(self) self.daemon = True @@ -31,7 +30,8 @@ def run(self): for msg in messages: if not self._running: break - self.process_message(self._store, self._requester, msg, self._ready) + if self.process_message(self._store, self._requester, msg, self._ready) is True: + self._ready.set() except Exception as e: log.error("Could not connect to LaunchDarkly stream: " + str(e.message) + " waiting 1 second before trying again.") @@ -51,8 +51,8 @@ def process_message(store, requester, msg, ready): if msg.event == 'put': store.init(payload) if not ready.is_set() and store.initialized: - ready.set() log.info("StreamingUpdateProcessor initialized ok") + return True elif msg.event == 'patch': key = payload['path'][1:] feature = payload['data'] @@ -64,12 +64,13 @@ def process_message(store, requester, msg, ready): elif msg.event == "indirect/put": store.init(requester.get_all()) if not ready.is_set() and store.initialized: - ready.set() log.info("StreamingUpdateProcessor initialized ok") + return True elif msg.event == 'delete': key = payload['path'][1:] # noinspection PyShadowingNames version = payload['version'] store.delete(key, version) else: - log.warning('Unhandled event in stream processor: ' + msg.event) \ No newline at end of file + log.warning('Unhandled event in stream processor: ' + msg.event) + return False diff --git a/ldclient/twisted_impls.py b/ldclient/twisted_impls.py index 97ddd4bc..780366f1 100644 --- a/ldclient/twisted_impls.py +++ b/ldclient/twisted_impls.py @@ -16,7 +16,6 @@ class TwistedHttpFeatureRequester(FeatureRequester): - def __init__(self, sdk_key, config): self._sdk_key = sdk_key self._session = CacheControl(txrequests.Session()) @@ -56,7 +55,6 @@ def _get_all(self): class TwistedConfig(Config): - def __init__(self, *args, **kwargs): self.update_processor_class = TwistedStreamProcessor self.event_consumer_class = TwistedEventConsumer @@ -68,10 +66,20 @@ class TwistedStreamProcessor(UpdateProcessor): def close(self): self.sse_client.stop() - def __init__(self, sdk_key, config, store, requester, ready): + def __init__(self, sdk_key, config, requester, store, ready): + self._uri = config.stream_uri self._store = store self._requester = requester self._ready = ready + + def process(): + init_ok = partial(StreamingUpdateProcessor.process_message, + self._store, + self._requester, + self._ready) + if init_ok is True: + self._ready.set() + self.sse_client = TwistedSSEClient(config.stream_uri, headers=_stream_headers(sdk_key, "PythonTwistedClient"), verify_ssl=config.verify_ssl, @@ -80,8 +88,10 @@ def __init__(self, sdk_key, config, store, requester, ready): self._requester, self._ready)) self.running = False + log.info("Created TwistedStreamProcessor with FeatureStore: " + str(self._store)) def start(self): + log.info("Starting TwistedStreamProcessor connecting to uri: " + self._uri) self.sse_client.start() self.running = True @@ -89,14 +99,14 @@ def stop(self): self.sse_client.stop() def initialized(self): - return self._ready.is_set() and self._store.initialized() + # return self._ready.is_set() and self._store.initialized() + return self._store.initialized() def is_alive(self): return self.running and self._store.initialized() class TwistedEventConsumer(EventConsumer): - def __init__(self, queue, sdk_key, config): self._queue = queue """ @type: queue.Queue """ @@ -112,6 +122,7 @@ def __init__(self, queue, sdk_key, config): """ :type: LoopingCall""" def start(self): + log.info("Starting TwistedEventConsumer") self._looping_call = task.LoopingCall(self._consume) self._looping_call.start(5) @@ -163,6 +174,7 @@ def do_send(should_retry): except: log.exception( 'Unhandled exception in event consumer. Analytics events were not processed.') + try: yield do_send(True) finally: @@ -171,7 +183,6 @@ def do_send(should_retry): class TwistedLDClient(LDClient): - def __init__(self, sdk_key, config=None): if config is None: config = TwistedConfig() diff --git a/ldclient/twisted_redis_feature_store.py b/ldclient/twisted_redis_feature_store.py new file mode 100644 index 00000000..d7bc1c9c --- /dev/null +++ b/ldclient/twisted_redis_feature_store.py @@ -0,0 +1,133 @@ +from __future__ import absolute_import + +import json +from urlparse import urlparse + +from twisted.internet import defer +from twisted.internet import protocol, reactor +from txredis.client import RedisClient + +from ldclient.expiringdict import ExpiringDict +from ldclient.interfaces import FeatureStore +from ldclient.redis_feature_store import ForgetfulDict, INIT_KEY +from ldclient.util import log + + +class TwistedRedisFeatureStore(FeatureStore): + def __init__(self, + url='redis://localhost:6379/0', + expiration=15, + capacity=1000, + redis_prefix='launchdarkly'): + self._url = url + parsed_url = urlparse(url) + self._redis_host = parsed_url.hostname + self._redis_port = parsed_url.port + self._features_key = "{}:features".format(redis_prefix) + self._cache = ForgetfulDict() if expiration == 0 else ExpiringDict(max_len=capacity, + max_age_seconds=expiration) + log.info("Created TwistedRedisFeatureStore with url: " + url) + + def _get_connection(self): + client_creator = protocol.ClientCreator(reactor, RedisClient) + return client_creator.connectTCP(self._redis_host, self._redis_port) + + # @property + def initialized(self): + i = self._initialized() + return i + + @defer.inlineCallbacks + def _initialized(self): + r = yield self._get_connection() + """ :type: RedisClient """ + i = yield r.exists(self._features_key) + defer.returnValue(i) + + @defer.inlineCallbacks + def upsert(self, key, feature): + r = yield self._get_connection() + """ :type: RedisClient """ + r.watch(self._features_key) + old = yield self.get(key) + if old: + if old['version'] >= feature['version']: + r.unwatch() + return + + feature_json = json.dumps(feature) + r.hset(self._features_key, key, feature_json) + self._cache[key] = feature + r.unwatch() + + @defer.inlineCallbacks + def all(self): + r = yield self._get_connection() + """ :type: RedisClient """ + all_features = yield r.hgetall(self._features_key) + if all_features is None or all_features is "": + log.warn("TwistedRedisFeatureStore: call to get all flags returned no results. Returning None.") + defer.returnValue(None) + + results = {} + for k, f_json in all_features.items() or {}: + f = json.loads(f_json.decode('utf-8')) + if 'deleted' in f and f['deleted'] is False: + results[f['key']] = f + defer.returnValue(results) + + @defer.inlineCallbacks + def delete(self, key, version): + r = yield self._get_connection() + """ :type: RedisClient """ + r.watch(self._features_key) + f_json = yield r.hget(self._features_key, key) + if f_json: + f = json.loads(f_json.decode('utf-8')) + if f is not None and f['version'] < version: + f['deleted'] = True + f['version'] = version + elif f is None: + f = {'deleted': True, 'version': version} + f_json = json.dumps(f) + r.hset(self._features_key, key, f_json) + self._cache[key] = f + r.unwatch() + + @defer.inlineCallbacks + def init(self, features): + r = yield self._get_connection() + """ :type: RedisClient """ + + r.multi() + r.delete(self._features_key) + self._cache.clear() + + for k, f in features.items(): + f_json = json.dumps(f) + r.hset(self._features_key, k, f_json) + self._cache[k] = f + r.execute() + log.info("Initialized TwistedRedisFeatureStore with " + str(len(features)) + " feature flags") + + @defer.inlineCallbacks + def get(self, key): + cached = self._cache.get(key) + if cached is not None: + defer.returnValue(cached) + else: + r = yield self._get_connection() + """ :type: RedisClient """ + f_json = yield r.hget(self._features_key, key) + if f_json is None or f_json is "": + log.warn( + "TwistedRedisFeatureStore: feature flag with key: " + key + " not found in Redis. Returning None.") + defer.returnValue(None) + + f = json.loads(f_json.decode('utf-8')) + if f.get('deleted', False) is True: + log.warn("TwistedRedisFeatureStore: get returned deleted flag from Redis. Returning None.") + defer.returnValue(None) + + self._cache[key] = f + defer.returnValue(f) diff --git a/twisted-requirements.txt b/twisted-requirements.txt index 787ab140..957f6c3f 100644 --- a/twisted-requirements.txt +++ b/twisted-requirements.txt @@ -1,4 +1,6 @@ -txrequests>=0.9 -pyOpenSSL>=0.14 -cryptography>=1.0 -service_identity>=16.0 \ No newline at end of file +cryptography>=1.4 +pyOpenSSL>=16.0.0 +service_identity>=16.0 +twisted>=16.3.0 +txredis>=2.4 +txrequests>=0.9.2 \ No newline at end of file From 2134d2cf8340c7f16ceb8c7969c36cc7554c4d5e Mon Sep 17 00:00:00 2001 From: Dan Richelson Date: Wed, 10 Aug 2016 14:03:03 -0700 Subject: [PATCH 2/9] [wip] twisted stuff not really working tho --- ldclient/twisted_impls.py | 10 +-- ldclient/twisted_redis_feature_store.py | 30 +++++---- ldclient/twisted_sse.py | 1 + testing/test_integration_twisted.py | 87 +++++++++++++++++++++++++ 4 files changed, 112 insertions(+), 16 deletions(-) create mode 100644 testing/test_integration_twisted.py diff --git a/ldclient/twisted_impls.py b/ldclient/twisted_impls.py index 780366f1..7ce1cf90 100644 --- a/ldclient/twisted_impls.py +++ b/ldclient/twisted_impls.py @@ -88,10 +88,10 @@ def process(): self._requester, self._ready)) self.running = False - log.info("Created TwistedStreamProcessor with FeatureStore: " + str(self._store)) + log.info("Created TwistedStreamProcessor connecting to uri: " + self._uri + " using feature store: " + str(self._store)) def start(self): - log.info("Starting TwistedStreamProcessor connecting to uri: " + self._uri) + log.info("Starting TwistedStreamProcessor") self.sse_client.start() self.running = True @@ -99,8 +99,8 @@ def stop(self): self.sse_client.stop() def initialized(self): - # return self._ready.is_set() and self._store.initialized() - return self._store.initialized() + return self._ready.is_set() and self._store.initialized() + #return self._store.initialized() def is_alive(self): return self.running and self._store.initialized() @@ -159,7 +159,7 @@ def do_send(should_retry): hdrs = _headers(self._sdk_key) r = yield self._session.post(self._config.events_uri, headers=hdrs, - timeout=(self._config.connect, self._config.read), + timeout=(self._config.connect_timeout, self._config.read_timeout), data=json.dumps(body)) r.raise_for_status() except ProtocolError as e: diff --git a/ldclient/twisted_redis_feature_store.py b/ldclient/twisted_redis_feature_store.py index d7bc1c9c..8ae9fc60 100644 --- a/ldclient/twisted_redis_feature_store.py +++ b/ldclient/twisted_redis_feature_store.py @@ -26,23 +26,31 @@ def __init__(self, self._features_key = "{}:features".format(redis_prefix) self._cache = ForgetfulDict() if expiration == 0 else ExpiringDict(max_len=capacity, max_age_seconds=expiration) - log.info("Created TwistedRedisFeatureStore with url: " + url) + log.info("Created TwistedRedisFeatureStore with url: " + url + " using key: " + self._features_key) def _get_connection(self): client_creator = protocol.ClientCreator(reactor, RedisClient) return client_creator.connectTCP(self._redis_host, self._redis_port) - # @property def initialized(self): - i = self._initialized() - return i - - @defer.inlineCallbacks - def _initialized(self): - r = yield self._get_connection() - """ :type: RedisClient """ - i = yield r.exists(self._features_key) - defer.returnValue(i) + initialized = self._cache.get(INIT_KEY) + if initialized: + # reset ttl + self._cache[INIT_KEY] = True + return True + + @defer.inlineCallbacks + def redis_initialized(): + r = yield self._get_connection() + """ :type: RedisClient """ + i = yield r.exists(self._features_key) + if i: + # reset ttl + self._cache[INIT_KEY] = True + defer.returnValue(i) + + initialized = redis_initialized() + return initialized @defer.inlineCallbacks def upsert(self, key, feature): diff --git a/ldclient/twisted_sse.py b/ldclient/twisted_sse.py index b78c98ef..c35f3533 100644 --- a/ldclient/twisted_sse.py +++ b/ldclient/twisted_sse.py @@ -47,6 +47,7 @@ def connect(self, last_id=None): """ Connect to the event source URL """ + log.info("Connecting to event source: " + self.url) headers = deepcopy(self.headers) if last_id: headers['Last-Event-ID'] = last_id diff --git a/testing/test_integration_twisted.py b/testing/test_integration_twisted.py new file mode 100644 index 00000000..0972d00d --- /dev/null +++ b/testing/test_integration_twisted.py @@ -0,0 +1,87 @@ +import logging +from ldclient import TwistedConfig, TwistedLDClient, LDClient +from ldclient.twisted_sse import Event +import pytest +from testing.server_util import SSEServer, GenericServer +from testing.twisted_util import wait_until, is_equal + +logging.basicConfig(level=logging.DEBUG) + + +@pytest.fixture() +def server(request): + server = GenericServer() + + def fin(): + server.shutdown() + + request.addfinalizer(fin) + return server + + +@pytest.fixture() +def stream(request): + server = SSEServer() + + def fin(): + server.shutdown() + + request.addfinalizer(fin) + return server + + +@pytest.inlineCallbacks +def test_toggle(server): + server.add_feature(feature("foo", "jim")['foo']) + client = TwistedLDClient("apikey", TwistedConfig(base_uri=server.url)) + yield wait_until(is_equal(lambda: client.toggle("foo", user('xyz'), "blah"), "jim")) + + +@pytest.inlineCallbacks +def test_sse_init(server, stream): + stream.queue.put(Event(event="put", data=feature("foo", "jim"))) + client = LDClient("apikey", TwistedConfig( + stream=True, base_uri=server.url, stream_uri=stream.url)) + yield wait_until(is_equal(lambda: client.is_initialized(), True)) + + +@pytest.inlineCallbacks +def test_sse_reconnect(server, stream): + server.post_events() + stream.queue.put(Event(event="put", data=feature("foo", "on"))) + client = LDClient("apikey", TwistedConfig( + stream=True, base_uri=server.url, stream_uri=stream.url)) + yield wait_until(is_equal(lambda: client.toggle("foo", user('xyz'), "blah"), "on")) + + stream.stop() + + yield wait_until(is_equal(lambda: client.toggle("foo", user('xyz'), "blah"), "on")) + + stream.start() + + stream.queue.put(Event(event="put", data=feature("foo", "jim"))) + client = LDClient("apikey", TwistedConfig( + stream=True, base_uri=server.url, stream_uri=stream.url)) + yield wait_until(is_equal(lambda: client.toggle("foo", user('xyz'), "blah"), "jim")) + + +def feature(key, val): + return { + key: {"name": "Feature {}".format(key), "key": key, "kind": "flag", "salt": "Zm9v", "on": val, + "variations": [{"value": val, "weight": 100, + "targets": [{"attribute": "key", "op": "in", "values": []}], + "userTarget": {"attribute": "key", "op": "in", "values": []}}, + {"value": False, "weight": 0, + "targets": [{"attribute": "key", "op": "in", "values": []}], + "userTarget": {"attribute": "key", "op": "in", "values": []}}], + "commitDate": "2015-09-08T21:24:16.712Z", + "creationDate": "2015-09-08T21:06:16.527Z", "version": 4}} + + +def user(name): + return { + u'key': name, + u'custom': { + u'bizzle': u'def' + } + } From e13c40de8c9d255a047f36bc35f165875570f42c Mon Sep 17 00:00:00 2001 From: Dan Richelson Date: Tue, 16 Aug 2016 11:42:46 -0700 Subject: [PATCH 3/9] Implement Twisted for LDD mode only --- CHANGELOG.md | 16 ++ CONTRIBUTING.md | 1 - README.md | 1 - circle.yml | 4 - demo/demo_twisted.py | 21 --- ldclient/__init__.py | 6 - ldclient/client.py | 53 ++++--- ldclient/feature_store.py | 12 +- ldclient/flag.py | 7 +- ldclient/interfaces.py | 14 +- ldclient/redis_feature_store.py | 20 +-- ldclient/twisted_client.py | 80 ++++++++++ ldclient/twisted_event_consumer.py | 88 +++++++++++ ldclient/twisted_impls.py | 192 ------------------------ ldclient/twisted_redis_feature_store.py | 152 +++++++++---------- ldclient/twisted_sse.py | 165 -------------------- ldclient/version.py | 2 +- ldd/README.txt | 20 --- ldd/Vagrantfile | 125 --------------- ldd/bootstrap.sh | 85 ----------- ldd/pytest.ini | 2 - ldd/test_ldd.py | 58 ------- ldd/test_ldd_twisted.py | 57 ------- pytest.ini | 3 - setup.py | 2 +- test-requirements.txt | 1 - testing/server_util.py | 158 ------------------- testing/twisted_util.py | 29 ---- twisted-requirements.txt | 7 +- 29 files changed, 322 insertions(+), 1059 deletions(-) create mode 100644 CHANGELOG.md delete mode 100644 demo/demo_twisted.py create mode 100644 ldclient/twisted_client.py create mode 100644 ldclient/twisted_event_consumer.py delete mode 100644 ldclient/twisted_impls.py delete mode 100644 ldclient/twisted_sse.py delete mode 100644 ldd/README.txt delete mode 100644 ldd/Vagrantfile delete mode 100755 ldd/bootstrap.sh delete mode 100644 ldd/pytest.ini delete mode 100644 ldd/test_ldd.py delete mode 100644 ldd/test_ldd_twisted.py delete mode 100644 pytest.ini delete mode 100644 testing/server_util.py delete mode 100644 testing/twisted_util.py diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000..cc9c77f5 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,16 @@ +# Change log + +All notable changes to the LaunchDarkly Python SDK will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org). + +## [2.0.0] - 2016-08-10 +### Added +- Support for multivariate feature flags. `variation` replaces `toggle` and can return a string, number, dict, or boolean value depending on how the flag is defined. +- New `all_flags` method returns all flag values for a specified user. +- New `secure_mode_hash` function computes a hash suitable for the new LaunchDarkly [JavaScript client's secure mode feature](https://github.com/launchdarkly/js-client#secure-mode). + +### Deprecated +- The `toggle` call has been deprecated in favor of `variation`. + +### Removed +- Twisted support has temporarily been removed. + diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index b564861d..bcfdfe0f 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -16,7 +16,6 @@ Development information (for developing this module itself) pip install -r requirements.txt pip install -r test-requirements.txt - pip install -r twisted-requirements.txt 1. Run tests: You'll need redis running locally on its default port of 6379. diff --git a/README.md b/README.md index b5593a53..daa689d2 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,6 @@ About LaunchDarkly * [Node.JS] (http://docs.launchdarkly.com/docs/node-sdk-reference "LaunchDarkly Node SDK") * [.NET] (http://docs.launchdarkly.com/docs/dotnet-sdk-reference "LaunchDarkly .Net SDK") * [Ruby] (http://docs.launchdarkly.com/docs/ruby-sdk-reference "LaunchDarkly Ruby SDK") - * [Python Twisted] (http://docs.launchdarkly.com/docs/python-twisted "LaunchDarkly Python Twisted SDK") * Explore LaunchDarkly * [launchdarkly.com] (https://launchdarkly.com/ "LaunchDarkly Main Website") for more information * [docs.launchdarkly.com] (http://docs.launchdarkly.com/ "LaunchDarkly Documentation") for our documentation and SDKs diff --git a/circle.yml b/circle.yml index 42ebff9d..eba43aed 100644 --- a/circle.yml +++ b/circle.yml @@ -11,10 +11,6 @@ dependencies: - pyenv shell 3.3.3; $(pyenv which pip) install -r test-requirements.txt - pyenv shell 3.4.2; $(pyenv which pip) install -r test-requirements.txt - - pyenv shell 2.7.10; $(pyenv which pip) install -r twisted-requirements.txt - - pyenv shell 3.3.3; $(pyenv which pip) install -r twisted-requirements.txt - - pyenv shell 3.4.2; $(pyenv which pip) install -r twisted-requirements.txt - - pyenv shell 2.7.10; $(pyenv which python) setup.py install - pyenv shell 3.3.3; $(pyenv which python) setup.py install - pyenv shell 3.4.2; $(pyenv which python) setup.py install diff --git a/demo/demo_twisted.py b/demo/demo_twisted.py deleted file mode 100644 index 2b2cd18b..00000000 --- a/demo/demo_twisted.py +++ /dev/null @@ -1,21 +0,0 @@ -from __future__ import print_function -from ldclient.twisted_impls import TwistedLDClient -from twisted.internet import task, defer - - -@defer.inlineCallbacks -def main(_): - sdk_key = 'whatever' - client = TwistedLDClient(sdk_key) - user = { - u'key': u'xyz', - u'custom': { - u'bizzle': u'def' - } - } - val = yield client.variation('foo', user) - yield client.flush() - print("Value: {}".format(val)) - -if __name__ == '__main__': - task.react(main) diff --git a/ldclient/__init__.py b/ldclient/__init__.py index feecfb74..7c365932 100644 --- a/ldclient/__init__.py +++ b/ldclient/__init__.py @@ -59,9 +59,3 @@ def emit(self, record): else: # noinspection PyUnresolvedReferences __BASE_TYPES__ = (str, float, int, bool, unicode) - - -try: - from .twisted_impls import * -except ImportError: - pass diff --git a/ldclient/client.py b/ldclient/client.py index a8c9b08f..3aab4d7e 100644 --- a/ldclient/client.py +++ b/ldclient/client.py @@ -120,10 +120,7 @@ def __init__(self, sdk_key, config=None, start_wait=5): self._event_consumer.start() if self._config.use_ldd: - if self._store.__class__ == "RedisFeatureStore": - log.info("Started LaunchDarkly Client in LDD mode") - return - log.error("LDD mode requires a RedisFeatureStore.") + log.info("Started LaunchDarkly Client in LDD mode") return if self._config.feature_requester_class: @@ -231,23 +228,35 @@ def send_event(value, version=None): if user.get('key', "") == "": log.warn("User key is blank. Flag evaluation will proceed, but the user will not be stored in LaunchDarkly.") - flag = self._store.get(key) - if not flag: - log.warn("Feature Flag key: " + key + " not found in Feature Store. Returning default.") - send_event(default) + def cb(flag): + try: + if not flag: + log.warn("Feature Flag key: " + key + " not found in Feature Store. Returning default.") + send_event(default) + return default + + return self._evaluate_and_send_events(flag, user, default) + + except Exception as e: + log.error("Exception caught in variation: " + e.message + " for flag key: " + key + " and user: " + str(user)) + return default - value, events = evaluate(flag, user, self._store) + return self._store.get(key, cb) + + def _evaluate(self, flag, user): + return evaluate(flag, user, self._store) + + def _evaluate_and_send_events(self, flag, user, default): + value, events = self._evaluate(flag, user) for event in events or []: self._send_event(event) - log.debug("Sending event: " + str(event)) - - if value is not None: - send_event(value, flag.get('version')) - return value - send_event(default, flag.get('version')) - return default + if value is None: + value = default + self._send_event({'kind': 'feature', 'key': flag.get('key'), + 'user': user, 'value': value, 'default': default, 'version': flag.get('version')}) + return value def all_flags(self, user): if self._config.offline: @@ -262,7 +271,17 @@ def all_flags(self, user): log.warn("User or user key is None when calling all_flags(). Returning None.") return None - return {k: evaluate(v, user, self._store)[0] for k, v in self._store.all().items() or {}} + def cb(all_flags): + try: + return self._evaluate_multi(user, all_flags) + except Exception as e: + log.error("Exception caught in all_flags: " + e.message + " for user: " + str(user)) + return {} + + return self._store.all(cb) + + def _evaluate_multi(self, user, flags): + return {k: self._evaluate(v, user)[0] for k, v in flags.items() or {}} def secure_mode_hash(self, user): if user.get('key') is None: diff --git a/ldclient/feature_store.py b/ldclient/feature_store.py index f24335d2..e5a0f237 100644 --- a/ldclient/feature_store.py +++ b/ldclient/feature_store.py @@ -10,24 +10,24 @@ def __init__(self): self._initialized = False self._features = {} - def get(self, key): + def get(self, key, callback): try: self._lock.rlock() f = self._features.get(key) if f is None: log.debug("Attempted to get missing feature: " + str(key) + " Returning None") - return None + return callback(None) if 'deleted' in f and f['deleted']: log.debug("Attempted to get deleted feature: " + str(key) + " Returning None") - return None - return f + return callback(None) + return callback(f) finally: self._lock.runlock() - def all(self): + def all(self, callback): try: self._lock.rlock() - return dict((k, f) for k, f in self._features.items() if ('deleted' not in f) or not f['deleted']) + return callback(dict((k, f) for k, f in self._features.items() if ('deleted' not in f) or not f['deleted'])) finally: self._lock.runlock() diff --git a/ldclient/flag.py b/ldclient/flag.py index faa117ff..34211c8e 100644 --- a/ldclient/flag.py +++ b/ldclient/flag.py @@ -21,10 +21,7 @@ def evaluate(flag, user, store): if value is not None: return value, prereq_events - if 'offVariation' in flag and flag['offVariation']: - value = _get_variation(flag, flag['offVariation']) - return value, prereq_events - return None, prereq_events + return _get_off_variation(flag), prereq_events def _evaluate(flag, user, store, prereq_events=None): @@ -32,7 +29,7 @@ def _evaluate(flag, user, store, prereq_events=None): failed_prereq = None prereq_value = None for prereq in flag.get('prerequisites') or []: - prereq_flag = store.get(prereq.get('key')) + prereq_flag = store.get(prereq.get('key'), lambda x: x) if prereq_flag is None: log.warn("Missing prereq flag: " + prereq.get('key')) failed_prereq = prereq diff --git a/ldclient/interfaces.py b/ldclient/interfaces.py index d6504503..fdc4d408 100644 --- a/ldclient/interfaces.py +++ b/ldclient/interfaces.py @@ -8,18 +8,18 @@ class FeatureStore(object): __metaclass__ = ABCMeta @abstractmethod - def get(self, key): + def get(self, key, callback): """ - Gets the data for a feature flag for evaluation - - :param key: The feature flag key + Gets a feature and calls the callback with the feature data to return the result + :param key: The feature key :type key: str - :return: The feature flag data - :rtype: dict + :param callback: The function that accepts the feature data and returns the feature value + :type callback: function + :return: The feature value. None if not found """ @abstractmethod - def all(self): + def all(self, callback): """ Returns all feature flags and their data diff --git a/ldclient/redis_feature_store.py b/ldclient/redis_feature_store.py index 5e979669..2ee9cc89 100644 --- a/ldclient/redis_feature_store.py +++ b/ldclient/redis_feature_store.py @@ -42,42 +42,42 @@ def init(self, features): pipe.execute() log.info("Initialized RedisFeatureStore with " + str(len(features)) + " feature flags") - def all(self): + def all(self, callback): r = redis.Redis(connection_pool=self._pool) all_features = r.hgetall(self._features_key) if all_features is None or all_features is "": log.warn("RedisFeatureStore: call to get all flags returned no results. Returning None.") - return None + return callback(None) results = {} for k, f_json in all_features.items() or {}: f = json.loads(f_json.decode('utf-8')) if 'deleted' in f and f['deleted'] is False: results[f['key']] = f - return results + return callback(results) - def get(self, key): + def get(self, key, callback): f = self._cache.get(key) if f is not None: # reset ttl self._cache[key] = f if f.get('deleted', False) is True: log.warn("RedisFeatureStore: get returned deleted flag from in-memory cache. Returning None.") - return None - return f + return callback(None) + return callback(f) r = redis.Redis(connection_pool=self._pool) f_json = r.hget(self._features_key, key) if f_json is None or f_json is "": log.warn("RedisFeatureStore: feature flag with key: " + key + " not found in Redis. Returning None.") - return None + return callback(None) f = json.loads(f_json.decode('utf-8')) if f.get('deleted', False) is True: log.warn("RedisFeatureStore: get returned deleted flag from Redis. Returning None.") - return None + return callback(None) self._cache[key] = f - return f + return callback(f) def delete(self, key, version): r = redis.Redis(connection_pool=self._pool) @@ -112,7 +112,7 @@ def initialized(self): def upsert(self, key, feature): r = redis.Redis(connection_pool=self._pool) r.watch(self._features_key) - old = self.get(key) + old = self.get(key, lambda x: x) if old: if old['version'] >= feature['version']: r.unwatch() diff --git a/ldclient/twisted_client.py b/ldclient/twisted_client.py new file mode 100644 index 00000000..01ba3e90 --- /dev/null +++ b/ldclient/twisted_client.py @@ -0,0 +1,80 @@ +from functools import partial + +from twisted.internet import defer +from twisted.internet.defer import DeferredList + +from ldclient import LDClient +from ldclient import log +from ldclient.flag import _get_variation, _evaluate_index, _get_off_variation + + +class TwistedLDClient(LDClient): + @defer.inlineCallbacks + def _evaluate_and_send_events(self, flag, user, default): + value = yield self._evaluate(flag, user) + if value is None: + value = default + log.info("value: " + str(value)) + self._send_event({'kind': 'feature', 'key': flag.get('key'), 'user': user, 'value': value, + 'default': default, 'version': flag.get('version')}) + defer.returnValue(value) + + def _evaluate(self, flag, user): + if flag.get('on', False): + def cb(result): + if result is not None: + return result + return _get_off_variation(flag) + + value = self._evaluate_internal(flag, user) + value.addBoth(cb) + return value + + return _get_off_variation(flag) + + def _evaluate_internal(self, flag, user): + def check_prereq_results(result): + prereq_ok = True + for r in result: # r is a tuple of 2 booleans: (error, prereqMatches) + if r[0] is False or r[1] is False: + prereq_ok = False + + if prereq_ok is True: + index = _evaluate_index(flag, user) + variation = _get_variation(flag, index) + return variation + return None + + results = DeferredList(map(partial(self._evaluate_prereq, user), flag.get('prerequisites') or [])) + results.addBoth(check_prereq_results) + return results + + # returns False if the prereq failed or there was an error evaluating it. Otherwise returns True + def _evaluate_prereq(self, user, prereq): + + @defer.inlineCallbacks + def eval_prereq(prereq_flag): + if prereq_flag is None: + log.warn("Missing prereq flag: " + prereq.get('key')) + defer.returnValue(False) + if prereq_flag.get('on', False) is True: + prereq_value = yield self._evaluate_internal(prereq_flag, user) + variation = _get_variation(prereq_flag, prereq.get('variation')) + if prereq_value is None or not prereq_value == variation: + ok = False + else: + ok = True + else: + ok = False + defer.returnValue(ok) + + result = self._store.get(prereq.get('key'), eval_prereq) + return result + + @defer.inlineCallbacks + def _evaluate_multi(self, user, flags): + results = {} + for k, v in flags.items() or {}: + r = yield self._evaluate(v, user) + results[k] = r + defer.returnValue(results) diff --git a/ldclient/twisted_event_consumer.py b/ldclient/twisted_event_consumer.py new file mode 100644 index 00000000..286d8389 --- /dev/null +++ b/ldclient/twisted_event_consumer.py @@ -0,0 +1,88 @@ +from __future__ import absolute_import + +import errno +import json + +import txrequests +from cachecontrol import CacheControl +from queue import Empty +from requests.packages.urllib3.exceptions import ProtocolError +from twisted.internet import task, defer + +from ldclient.interfaces import EventConsumer +from ldclient.util import _headers, log + + +class TwistedEventConsumer(EventConsumer): + + def __init__(self, queue, sdk_key, config): + self._queue = queue + """ @type: queue.Queue """ + + self._session = CacheControl(txrequests.Session()) + """ :type: txrequests.Session """ + + self._sdk_key = sdk_key + self._config = config + """ :type: ldclient.twisted.TwistedConfig """ + + self._looping_call = None + """ :type: LoopingCall""" + + def start(self): + self._looping_call = task.LoopingCall(self._consume) + self._looping_call.start(5) + + def stop(self): + self._looping_call.stop() + + def is_alive(self): + return self._looping_call is not None and self._looping_call.running + + def flush(self): + return self._consume() + + def _consume(self): + items = [] + try: + while True: + items.append(self._queue.get_nowait()) + except Empty: + pass + + if items: + return self.send_batch(items) + + @defer.inlineCallbacks + def send_batch(self, events): + @defer.inlineCallbacks + def do_send(should_retry): + # noinspection PyBroadException + try: + if isinstance(events, dict): + body = [events] + else: + body = events + hdrs = _headers(self._sdk_key) + r = yield self._session.post(self._config.events_uri, + headers=hdrs, + timeout=(self._config.connect_timeout, self._config.read_timeout), + data=json.dumps(body)) + r.raise_for_status() + except ProtocolError as e: + inner = e.args[1] + if inner.errno == errno.ECONNRESET and should_retry: + log.warning( + 'ProtocolError exception caught while sending events. Retrying.') + yield do_send(False) + else: + log.exception( + 'Unhandled exception in event consumer. Analytics events were not processed.') + except: + log.exception( + 'Unhandled exception in event consumer. Analytics events were not processed.') + try: + yield do_send(True) + finally: + for _ in events: + self._queue.task_done() diff --git a/ldclient/twisted_impls.py b/ldclient/twisted_impls.py deleted file mode 100644 index 7ce1cf90..00000000 --- a/ldclient/twisted_impls.py +++ /dev/null @@ -1,192 +0,0 @@ -from __future__ import absolute_import -from functools import partial -import json -from queue import Empty -import errno - -from cachecontrol import CacheControl -from ldclient.client import Config, LDClient -from ldclient.interfaces import FeatureRequester, EventConsumer, UpdateProcessor -from ldclient.streaming import StreamingUpdateProcessor -from ldclient.twisted_sse import TwistedSSEClient -from ldclient.util import _headers, _stream_headers, log -from requests.packages.urllib3.exceptions import ProtocolError -from twisted.internet import task, defer -import txrequests - - -class TwistedHttpFeatureRequester(FeatureRequester): - def __init__(self, sdk_key, config): - self._sdk_key = sdk_key - self._session = CacheControl(txrequests.Session()) - self._config = config - - def get_all(self): - @defer.inlineCallbacks - def run(should_retry): - # noinspection PyBroadException - try: - val = yield self._get_all(self) - defer.returnValue(val) - except ProtocolError as e: - inner = e.args[1] - if inner.errno == errno.ECONNRESET and should_retry: - log.warning( - 'ProtocolError exception caught while getting flags. Retrying.') - d = yield run(False) - defer.returnValue(d) - else: - log.exception('Unhandled exception.') - defer.returnValue(None) - except Exception: - log.exception('Unhandled exception.') - defer.returnValue(None) - - return run(True) - - @defer.inlineCallbacks - def _get_all(self): - hdrs = _headers(self._sdk_key) - uri = self._config.get_latest_features_uri - r = yield self._session.get(uri, headers=hdrs, timeout=(self._config.connect, self._config.read)) - r.raise_for_status() - feature = r.json() - defer.returnValue(feature) - - -class TwistedConfig(Config): - def __init__(self, *args, **kwargs): - self.update_processor_class = TwistedStreamProcessor - self.event_consumer_class = TwistedEventConsumer - self.feature_requester_class = TwistedHttpFeatureRequester - super(TwistedConfig, self).__init__(*args, **kwargs) - - -class TwistedStreamProcessor(UpdateProcessor): - def close(self): - self.sse_client.stop() - - def __init__(self, sdk_key, config, requester, store, ready): - self._uri = config.stream_uri - self._store = store - self._requester = requester - self._ready = ready - - def process(): - init_ok = partial(StreamingUpdateProcessor.process_message, - self._store, - self._requester, - self._ready) - if init_ok is True: - self._ready.set() - - self.sse_client = TwistedSSEClient(config.stream_uri, - headers=_stream_headers(sdk_key, "PythonTwistedClient"), - verify_ssl=config.verify_ssl, - on_event=partial(StreamingUpdateProcessor.process_message, - self._store, - self._requester, - self._ready)) - self.running = False - log.info("Created TwistedStreamProcessor connecting to uri: " + self._uri + " using feature store: " + str(self._store)) - - def start(self): - log.info("Starting TwistedStreamProcessor") - self.sse_client.start() - self.running = True - - def stop(self): - self.sse_client.stop() - - def initialized(self): - return self._ready.is_set() and self._store.initialized() - #return self._store.initialized() - - def is_alive(self): - return self.running and self._store.initialized() - - -class TwistedEventConsumer(EventConsumer): - def __init__(self, queue, sdk_key, config): - self._queue = queue - """ @type: queue.Queue """ - - self._session = CacheControl(txrequests.Session()) - """ :type: txrequests.Session """ - - self._sdk_key = sdk_key - self._config = config - """ :type: ldclient.twisted.TwistedConfig """ - - self._looping_call = None - """ :type: LoopingCall""" - - def start(self): - log.info("Starting TwistedEventConsumer") - self._looping_call = task.LoopingCall(self._consume) - self._looping_call.start(5) - - def stop(self): - self._looping_call.stop() - - def is_alive(self): - return self._looping_call is not None and self._looping_call.running - - def flush(self): - return self._consume() - - def _consume(self): - items = [] - try: - while True: - items.append(self._queue.get_nowait()) - except Empty: - pass - - if items: - return self.send_batch(items) - - @defer.inlineCallbacks - def send_batch(self, events): - @defer.inlineCallbacks - def do_send(should_retry): - # noinspection PyBroadException - try: - if isinstance(events, dict): - body = [events] - else: - body = events - hdrs = _headers(self._sdk_key) - r = yield self._session.post(self._config.events_uri, - headers=hdrs, - timeout=(self._config.connect_timeout, self._config.read_timeout), - data=json.dumps(body)) - r.raise_for_status() - except ProtocolError as e: - inner = e.args[1] - if inner.errno == errno.ECONNRESET and should_retry: - log.warning( - 'ProtocolError exception caught while sending events. Retrying.') - yield do_send(False) - else: - log.exception( - 'Unhandled exception in event consumer. Analytics events were not processed.') - except: - log.exception( - 'Unhandled exception in event consumer. Analytics events were not processed.') - - try: - yield do_send(True) - finally: - for _ in events: - self._queue.task_done() - - -class TwistedLDClient(LDClient): - def __init__(self, sdk_key, config=None): - if config is None: - config = TwistedConfig() - LDClient.__init__(self, sdk_key, config) - - -__all__ = ['TwistedConfig', 'TwistedLDClient'] diff --git a/ldclient/twisted_redis_feature_store.py b/ldclient/twisted_redis_feature_store.py index 8ae9fc60..2307a335 100644 --- a/ldclient/twisted_redis_feature_store.py +++ b/ldclient/twisted_redis_feature_store.py @@ -1,7 +1,7 @@ from __future__ import absolute_import import json -from urlparse import urlparse +import urlparse from twisted.internet import defer from twisted.internet import protocol, reactor @@ -20,7 +20,7 @@ def __init__(self, capacity=1000, redis_prefix='launchdarkly'): self._url = url - parsed_url = urlparse(url) + parsed_url = urlparse.urlparse(url) self._redis_host = parsed_url.hostname self._redis_port = parsed_url.port self._features_key = "{}:features".format(redis_prefix) @@ -52,90 +52,82 @@ def redis_initialized(): initialized = redis_initialized() return initialized - @defer.inlineCallbacks def upsert(self, key, feature): - r = yield self._get_connection() - """ :type: RedisClient """ - r.watch(self._features_key) - old = yield self.get(key) - if old: - if old['version'] >= feature['version']: - r.unwatch() - return - - feature_json = json.dumps(feature) - r.hset(self._features_key, key, feature_json) - self._cache[key] = feature - r.unwatch() - - @defer.inlineCallbacks - def all(self): - r = yield self._get_connection() - """ :type: RedisClient """ - all_features = yield r.hgetall(self._features_key) - if all_features is None or all_features is "": - log.warn("TwistedRedisFeatureStore: call to get all flags returned no results. Returning None.") + raise NotImplementedError() + + def all(self, callback): + @defer.inlineCallbacks + def redis_get_all(): + r = None + try: + r = yield self._get_connection() + """ :type: RedisClient """ + all_features = yield r.hgetall(self._features_key) + if all_features is None or all_features is "": + log.warn("TwistedRedisFeatureStore: call to get all flags returned no results. Returning None.") + defer.returnValue(None) + + results = {} + for k, f_json in all_features.items() or {}: + f = json.loads(f_json.decode('utf-8')) + if 'deleted' in f and f['deleted'] is False: + results[f['key']] = f + defer.returnValue(results) + except Exception as e: + log.error("Could not connect to Redis using url: " + self._url + " with error message: " + e.message) + defer.returnValue(None) + finally: + if r: + r.quit() defer.returnValue(None) - results = {} - for k, f_json in all_features.items() or {}: - f = json.loads(f_json.decode('utf-8')) - if 'deleted' in f and f['deleted'] is False: - results[f['key']] = f - defer.returnValue(results) + all_flags = redis_get_all() + all_flags.addBoth(callback) + return all_flags - @defer.inlineCallbacks def delete(self, key, version): - r = yield self._get_connection() - """ :type: RedisClient """ - r.watch(self._features_key) - f_json = yield r.hget(self._features_key, key) - if f_json: - f = json.loads(f_json.decode('utf-8')) - if f is not None and f['version'] < version: - f['deleted'] = True - f['version'] = version - elif f is None: - f = {'deleted': True, 'version': version} - f_json = json.dumps(f) - r.hset(self._features_key, key, f_json) - self._cache[key] = f - r.unwatch() - - @defer.inlineCallbacks + raise NotImplementedError() + def init(self, features): - r = yield self._get_connection() - """ :type: RedisClient """ - - r.multi() - r.delete(self._features_key) - self._cache.clear() - - for k, f in features.items(): - f_json = json.dumps(f) - r.hset(self._features_key, k, f_json) - self._cache[k] = f - r.execute() - log.info("Initialized TwistedRedisFeatureStore with " + str(len(features)) + " feature flags") - - @defer.inlineCallbacks - def get(self, key): - cached = self._cache.get(key) - if cached is not None: - defer.returnValue(cached) - else: - r = yield self._get_connection() - """ :type: RedisClient """ - f_json = yield r.hget(self._features_key, key) - if f_json is None or f_json is "": - log.warn( - "TwistedRedisFeatureStore: feature flag with key: " + key + " not found in Redis. Returning None.") - defer.returnValue(None) + raise NotImplementedError() - f = json.loads(f_json.decode('utf-8')) - if f.get('deleted', False) is True: - log.warn("TwistedRedisFeatureStore: get returned deleted flag from Redis. Returning None.") + def get(self, key, callback): + @defer.inlineCallbacks + def redis_get(): + r = None + try: + r = yield self._get_connection() + """ :type: RedisClient """ + get_result = yield r.hget(self._features_key, key) + if not get_result: + log.warn("Didn't get response from redis for key: " + key + " Returning None.") + defer.returnValue(None) + f_json = get_result.get(key) + if f_json is None or f_json is "": + log.warn( + "TwistedRedisFeatureStore: feature flag with key: " + key + " not found in Redis. Returning None.") + defer.returnValue(None) + + f = json.loads(f_json.decode('utf-8')) + if f.get('deleted', False) is True: + log.warn("TwistedRedisFeatureStore: get returned deleted flag from Redis. Returning None.") + defer.returnValue(None) + self._cache[key] = f + defer.returnValue(f) + except Exception as e: + log.error("Could not connect to Redis using url: " + self._url + " with error message: " + e.message) defer.returnValue(None) + finally: + if r: + r.quit() + defer.returnValue(None) + + cached = self._cache.get(key) + if cached is not None: + # reset ttl + self._cache[key] = cached + return callback(cached) - self._cache[key] = f - defer.returnValue(f) + f = redis_get() + f.addBoth(callback) + return f diff --git a/ldclient/twisted_sse.py b/ldclient/twisted_sse.py deleted file mode 100644 index c35f3533..00000000 --- a/ldclient/twisted_sse.py +++ /dev/null @@ -1,165 +0,0 @@ -from __future__ import absolute_import - -from copy import deepcopy -from ldclient.util import log, Event -from twisted.internet.defer import Deferred -from twisted.internet.ssl import ClientContextFactory -from twisted.web.client import Agent -from twisted.web.http_headers import Headers -from twisted.protocols.basic import LineReceiver - - -class NoValidationContextFactory(ClientContextFactory): - - def getContext(self, *_): - return ClientContextFactory.getContext(self) - - -class TwistedSSEClient(object): - - def __init__(self, url, headers, verify_ssl, on_event): - self.url = url - self.verify_ssl = verify_ssl - self.headers = headers - self.on_event = on_event - self.on_error_retry = 30 - self.running = False - self.current_request = None - - def reconnect(self, old_protocol): - """ - :type old_protocol: EventSourceProtocol - """ - if not self.running: - return - - retry = old_protocol.retry - if not retry: - retry = 5 - from twisted.internet import reactor - reactor.callLater(retry, self.connect, old_protocol.last_id) - - def start(self): - self.running = True - self.connect() - - def connect(self, last_id=None): - """ - Connect to the event source URL - """ - log.info("Connecting to event source: " + self.url) - headers = deepcopy(self.headers) - if last_id: - headers['Last-Event-ID'] = last_id - headers = dict([(x, [y.encode('utf-8')]) for x, y in headers.items()]) - url = self.url.encode('utf-8') - from twisted.internet import reactor - if self.verify_ssl: - agent = Agent(reactor) - else: - agent = Agent(reactor, NoValidationContextFactory()) - - d = agent.request( - 'GET', - url, - Headers(headers), - None) - self.current_request = d - d.addErrback(self.on_connect_error) - d.addCallback(self.on_response) - - def stop(self): - if self.running and self.current_request: - self.current_request.cancel() - - def on_response(self, response): - from twisted.internet import reactor - if response.code != 200: - log.error("non 200 response received: %d" % response.code) - reactor.callLater(self.on_error_retry, self.connect) - else: - finished = Deferred() - protocol = EventSourceProtocol(self.on_event, finished) - finished.addBoth(self.reconnect) - response.deliverBody(protocol) - return finished - - def on_connect_error(self, ignored): - """ - :type ignored: twisted.python.Failure - """ - from twisted.internet import reactor - ignored.printTraceback() - log.error("error connecting to endpoint {}: {}".format( - self.url, ignored.getTraceback())) - reactor.callLater(self.on_error_retry, self.connect) - - -class EventSourceProtocol(LineReceiver): - - def __init__(self, on_event, finished_deferred): - self.finished = finished_deferred - self.on_event = on_event - # Initialize the event and data buffers - self.event = '' - self.data = '' - self.id = None - self.last_id = None - self.retry = 5 # 5 second retry default - self.reset() - self.delimiter = b'\n' - - def reset(self): - self.event = 'message' - self.data = '' - self.id = None - self.retry = None - - def lineReceived(self, line): - if line == '': - # Dispatch event - self.dispatch_event() - else: - try: - field, value = line.split(':', 1) - # If value starts with a space, strip it. - value = lstrip(value) - except ValueError: - # We got a line with no colon, treat it as a field(ignore) - return - - if field == '': - # This is a comment; ignore - pass - elif field == 'data': - self.data += value + '\n' - elif field == 'event': - self.event = value - elif field == 'id': - self.id = value - pass - elif field == 'retry': - self.retry = value - pass - - def connectionLost(self, *_): - self.finished.callback(self) - - def dispatch_event(self): - """ - Dispatch the event - """ - # If last character is LF, strip it. - if self.data.endswith('\n'): - self.data = self.data[:-1] - log.debug("Dispatching event %s[%s]: %s", - self.event, self.id, self.data) - event = Event(self.data, self.event, self.id, self.retry) - self.on_event(event) - if self.id: - self.last_id = self.id - self.reset() - - -def lstrip(value): - return value[1:] if value.startswith(' ') else value diff --git a/ldclient/version.py b/ldclient/version.py index 164c787d..21014090 100644 --- a/ldclient/version.py +++ b/ldclient/version.py @@ -1 +1 @@ -VERSION = "2.0.0-beta3" +VERSION = "2.0.0" diff --git a/ldd/README.txt b/ldd/README.txt deleted file mode 100644 index d6e8d997..00000000 --- a/ldd/README.txt +++ /dev/null @@ -1,20 +0,0 @@ -To run the tests, run: - - vagrant up --provision - vagrant ssh - cd project/ldd - -Then run the desired test: - - # redis + python 2 + sync - py2/bin/py.test test_ldd.py - - # twisted + python 2 - py2/bin/py.test --twisted test_ldd_twisted.py - - # redis + python + sync - py3/bin/py.test test_ldd.py - -If the tests don't work, you may need to restart ldd as probably went into backoff mode: - - sudo service ldd restart diff --git a/ldd/Vagrantfile b/ldd/Vagrantfile deleted file mode 100644 index 92f644b0..00000000 --- a/ldd/Vagrantfile +++ /dev/null @@ -1,125 +0,0 @@ -# -*- mode: ruby -*- -# vi: set ft=ruby : - -# Vagrantfile API/syntax version. Don't touch unless you know what you're doing! -VAGRANTFILE_API_VERSION = "2" - -Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| - # All Vagrant configuration is done here. The most common configuration - # options are documented and commented below. For a complete reference, - # please see the online documentation at vagrantup.com. - - # Every Vagrant virtual environment requires a box to build off of. - config.vm.box = "ubuntu/trusty64" - - # The url from where the 'config.vm.box' box will be fetched if it - # doesn't already exist on the user's system. - config.vm.box_url = "https://vagrantcloud.com/ubuntu/boxes/trusty64" - - config.vm.provision :shell, path: "bootstrap.sh" - - # Create a forwarded port mapping which allows access to a specific port - # within the machine from a port on the host machine. In the example below, - # accessing "localhost:8080" will access port 80 on the guest machine. - # config.vm.network :forwarded_port, guest: 80, host: 8080 - - # Create a private network, which allows host-only access to the machine - # using a specific IP. - # config.vm.network :private_network, ip: "192.168.33.10" - - # Create a public network, which generally matched to bridged network. - # Bridged networks make the machine appear as another physical device on - # your network. - # config.vm.network :public_network - - # If true, then any SSH connections made will enable agent forwarding. - # Default value: false - # config.ssh.forward_agent = true - - # Share an additional folder to the guest VM. The first argument is - # the path on the host to the actual folder. The second argument is - # the path on the guest to mount the folder. And the optional third - # argument is a set of non-required options. - config.vm.synced_folder "..", "/home/vagrant/project" - - # Provider-specific configuration so you can fine-tune various - # backing providers for Vagrant. These expose provider-specific options. - # Example for VirtualBox: - # - # config.vm.provider :virtualbox do |vb| - # # Don't boot with headless mode - # vb.gui = true - # - # # Use VBoxManage to customize the VM. For example to change memory: - # vb.customize ["modifyvm", :id, "--memory", "1024"] - # end - # - # View the documentation for the provider you're using for more - # information on available options. - config.vm.provider :virtualbox do |vb| - vb.auto_nat_dns_proxy = false - vb.customize ["modifyvm", :id, "--natdnsproxy1", "off" ] - vb.customize ["modifyvm", :id, "--natdnshostresolver1", "off" ] - end - - # Enable provisioning with Puppet stand alone. Puppet manifests - # are contained in a directory path relative to this Vagrantfile. - # You will need to create the manifests directory and a manifest in - # the file canonical-ubuntu-12.04.pp in the manifests_path directory. - # - # An example Puppet manifest to provision the message of the day: - # - # # group { "puppet": - # # ensure => "present", - # # } - # # - # # File { owner => 0, group => 0, mode => 0644 } - # # - # # file { '/etc/motd': - # # content => "Welcome to your Vagrant-built virtual machine! - # # Managed by Puppet.\n" - # # } - # - # config.vm.provision :puppet do |puppet| - # puppet.manifests_path = "manifests" - # puppet.manifest_file = "site.pp" - # end - - # Enable provisioning with chef solo, specifying a cookbooks path, roles - # path, and data_bags path (all relative to this Vagrantfile), and adding - # some recipes and/or roles. - # - # config.vm.provision :chef_solo do |chef| - # chef.cookbooks_path = "../my-recipes/cookbooks" - # chef.roles_path = "../my-recipes/roles" - # chef.data_bags_path = "../my-recipes/data_bags" - # chef.add_recipe "mysql" - # chef.add_role "web" - # - # # You may also specify custom JSON attributes: - # chef.json = { :mysql_password => "foo" } - # end - - # Enable provisioning with chef server, specifying the chef server URL, - # and the path to the validation key (relative to this Vagrantfile). - # - # The Opscode Platform uses HTTPS. Substitute your organization for - # ORGNAME in the URL and validation key. - # - # If you have your own Chef Server, use the appropriate URL, which may be - # HTTP instead of HTTPS depending on your configuration. Also change the - # validation key to validation.pem. - # - # config.vm.provision :chef_client do |chef| - # chef.chef_server_url = "https://api.opscode.com/organizations/ORGNAME" - # chef.validation_key_path = "ORGNAME-validator.pem" - # end - # - # If you're using the Opscode platform, your validator client is - # ORGNAME-validator, replacing ORGNAME with your organization name. - # - # If you have your own Chef Server, the default validation client name is - # chef-validator, unless you changed the configuration. - # - # chef.validation_client_name = "ORGNAME-validator" -end diff --git a/ldd/bootstrap.sh b/ldd/bootstrap.sh deleted file mode 100755 index 6a8cf631..00000000 --- a/ldd/bootstrap.sh +++ /dev/null @@ -1,85 +0,0 @@ -#!/bin/bash - -# init -apt-get update 2> /dev/null - -# redis -apt-get install -y redis-server 2> /dev/null - -# ntp -apt-get install ntp -y 2> /dev/null -service ntp restart - -# install dependencies and services -apt-get install unzip -y 2> /dev/null -apt-get install -y vim curl 2> /dev/null -apt-get install git -y 2> /dev/null - -# Python things -echo "Install python" -apt-get install -y build-essentials 2> /dev/null -apt-get install -y python-pip 2> /dev/null -apt-get install -y python-virtualenv 2> /dev/null -apt-get install -y python-dev 2> /dev/null -echo "install other things" -apt-get install -y libssl-dev libsqlite3-dev libbz2-dev 2> /dev/null -apt-get install -y libffi-dev 2> /dev/null -wget -q https://www.python.org/ftp/python/3.4.3/Python-3.4.3.tgz -tar xfvz Python-3.4.3.tgz -cd Python-3.4.3/ -./configure 2> /dev/null -make 2> /dev/null -sudo make install 2> /dev/null -rm /usr/bin/python3.4 - -# set vim tabs -cat < /home/vagrant/.vimrc -set tabstop=4 -EOF -chown vagrant.vagrant /home/vagrant/.vimrc - -# install ldd -cd /home/vagrant -wget -q https://github.com/launchdarkly/ldd/releases/download/ca7092/ldd_linux_amd64.tar.gz -tar xfvz ldd_linux_amd64.tar.gz -cat < /home/vagrant/ldd_linux_amd64/ldd.conf -[redis] -host = "localhost" -port = 6379 - -[main] -sdkKey = "YOUR_SDK_KEY" -prefix = "launchdarkly" -streamUri = "http://localhost:8000" -EOF -cat < /etc/init/ldd.conf -description "Run LaunchDarkly Daemon" - -# no start option as you might not want it to auto-start -# This might not be supported - you might need a: start on runlevel [3] -start on runlevel [2345] stop on runlevel [!2345] - -# if you want it to automatically restart if it crashes, leave the next line in -respawn - -script - cd /home/vagrant/ldd_linux_amd64 - su -c "./ldd" vagrant -end script -EOF -service ldd restart -# install project node_modules -su - vagrant -cd /home/vagrant/project/ldd - - -virtualenv py2 -py2/bin/pip install -U -r ../requirements.txt -py2/bin/pip install -U -r ../test-requirements.txt -py2/bin/pip install -U -r ../twisted-requirements.txt -py2/bin/pip install -U -r ../redis-requirements.txt - -pyvenv py3 -py3/bin/pip install -U -r ../requirements.txt -py3/bin/pip install -U -r ../test-requirements.txt -py3/bin/pip install -U -r ../redis-requirements.txt \ No newline at end of file diff --git a/ldd/pytest.ini b/ldd/pytest.ini deleted file mode 100644 index f1d7d693..00000000 --- a/ldd/pytest.ini +++ /dev/null @@ -1,2 +0,0 @@ -[pytest] -twisted = 0 diff --git a/ldd/test_ldd.py b/ldd/test_ldd.py deleted file mode 100644 index 1b7f7dc5..00000000 --- a/ldd/test_ldd.py +++ /dev/null @@ -1,58 +0,0 @@ -from functools import partial -import sys - -from ldclient.redis_feature_store import RedisFeatureStore - -sys.path.append("..") -sys.path.append("../testing") - -from ldclient.util import Event -import logging -from ldclient.client import Config, LDClient -import pytest -from testing.server_util import SSEServer -from testing.sync_util import wait_until - -logging.basicConfig(level=logging.DEBUG) - - -@pytest.fixture() -def stream(request): - server = SSEServer(port=8000) - - def fin(): - server.shutdown() - - request.addfinalizer(fin) - return server - - -def test_sse_init(stream): - stream.queue.put(Event(event="put", data=feature("foo", "jim"))) - client = LDClient("apikey", Config(use_ldd=True, - feature_store=RedisFeatureStore(), - events_enabled=False)) - wait_until(lambda: client.variation( - "foo", user('xyz'), "blah") == "jim", timeout=10) - - -def feature(key, val): - return { - key: {"name": "Feature {}".format(key), "key": key, "kind": "flag", "salt": "Zm9v", "on": True, - "variations": [{"value": val, "weight": 100, - "targets": [{"attribute": "key", "op": "in", "values": []}], - "userTarget": {"attribute": "key", "op": "in", "values": []}}, - {"value": False, "weight": 0, - "targets": [{"attribute": "key", "op": "in", "values": []}], - "userTarget": {"attribute": "key", "op": "in", "values": []}}], - "commitDate": "2015-09-08T21:24:16.712Z", - "creationDate": "2015-09-08T21:06:16.527Z", "version": 4}} - - -def user(name): - return { - u'key': name, - u'custom': { - u'bizzle': u'def' - } - } diff --git a/ldd/test_ldd_twisted.py b/ldd/test_ldd_twisted.py deleted file mode 100644 index cb33a139..00000000 --- a/ldd/test_ldd_twisted.py +++ /dev/null @@ -1,57 +0,0 @@ -import sys -sys.path.append("..") -sys.path.append("../testing") - -from ldclient.noop import NoOpFeatureRequester -from ldclient import TwistedConfig -from ldclient.twisted_redis import create_redis_ldd_processor -from testing.twisted_util import is_equal, wait_until -from ldclient.util import Event -import logging -from ldclient.client import LDClient -import pytest -from testing.server_util import SSEServer - -logging.basicConfig(level=logging.DEBUG) - - -@pytest.fixture() -def stream(request): - server = SSEServer(port=8000) - - def fin(): - server.shutdown() - - request.addfinalizer(fin) - return server - - -@pytest.inlineCallbacks -def test_sse_init(stream): - stream.queue.put(Event(event="put", data=feature("foo", "jim"))) - client = LDClient("apikey", TwistedConfig(stream=True, update_processor_class=create_redis_ldd_processor, - feature_requester_class=NoOpFeatureRequester, - events=False)) - yield wait_until(is_equal(lambda: client.toggle("foo", user('xyz'), "blah"), "jim")) - - -def feature(key, val): - return { - key: {"name": "Feature {}".format(key), "key": key, "kind": "flag", "salt": "Zm9v", "on": True, - "variations": [{"value": val, "weight": 100, - "targets": [{"attribute": "key", "op": "in", "values": []}], - "userTarget": {"attribute": "key", "op": "in", "values": []}}, - {"value": False, "weight": 0, - "targets": [{"attribute": "key", "olikep": "in", "values": []}], - "userTarget": {"attribute": "key", "op": "in", "values": []}}], - "commitDate": "2015-09-08T21:24:16.712Z", - "creationDate": "2015-09-08T21:06:16.527Z", "version": 4}} - - -def user(name): - return { - u'key': name, - u'custom': { - u'bizzle': u'def' - } - } diff --git a/pytest.ini b/pytest.ini deleted file mode 100644 index b86adf8e..00000000 --- a/pytest.ini +++ /dev/null @@ -1,3 +0,0 @@ -[pytest] -# enables pytest-twisted -twisted = 1 \ No newline at end of file diff --git a/setup.py b/setup.py index 6f534b7c..1094fa70 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ def run(self): setup( name='ldclient-py', - version='2.0.0-beta3', + version='2.0.0', author='Catamorphic Co.', author_email='team@catamorphic.com', packages=['ldclient'], diff --git a/test-requirements.txt b/test-requirements.txt index 1e455c0c..78aa772b 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,4 +1,3 @@ pytest>=2.8 -pytest-twisted==1.5 pytest-timeout>=1.0 redis>=2.10.5 diff --git a/testing/server_util.py b/testing/server_util.py deleted file mode 100644 index b2d3e629..00000000 --- a/testing/server_util.py +++ /dev/null @@ -1,158 +0,0 @@ -import json -import logging -from queue import Empty -import ssl -import threading - -try: - import queue as queuemod -except: - import Queue as queuemod - -try: - from SimpleHTTPServer import SimpleHTTPRequestHandler - # noinspection PyPep8Naming - import SocketServer as socketserver - import urlparse -except ImportError: - # noinspection PyUnresolvedReferences - from http.server import SimpleHTTPRequestHandler - # noinspection PyUnresolvedReferences - import socketserver - # noinspection PyUnresolvedReferences - from urllib import parse as urlparse - - -class TestServer(socketserver.TCPServer): - allow_reuse_address = True - - -class GenericServer: - - def __init__(self, host='localhost', use_ssl=False, port=None, cert_file="self_signed.crt", - key_file="self_signed.key"): - - self.get_paths = {} - self.post_paths = {} - self.raw_paths = {} - self.stopping = False - parent = self - - class CustomHandler(SimpleHTTPRequestHandler): - - def handle_request(self, paths): - # sort so that longest path wins - for path, handler in sorted(paths.items(), key=lambda item: len(item[0]), reverse=True): - if self.path.startswith(path): - handler(self) - return - self.send_response(404) - self.end_headers() - - def do_GET(self): - self.handle_request(parent.get_paths) - - # noinspection PyPep8Naming - def do_POST(self): - self.handle_request(parent.post_paths) - - self.httpd = TestServer( - ("0.0.0.0", port if port is not None else 0), CustomHandler) - port = port if port is not None else self.httpd.socket.getsockname()[1] - self.url = ("https://" if use_ssl else "http://") + host + ":%s" % port - self.port = port - logging.info("serving at port %s: %s" % (port, self.url)) - - if use_ssl: - self.httpd.socket = ssl.wrap_socket(self.httpd.socket, - certfile=cert_file, - keyfile=key_file, - server_side=True, - ssl_version=ssl.PROTOCOL_TLSv1) - self.start() - - def start(self): - self.stopping = False - httpd_thread = threading.Thread(target=self.httpd.serve_forever) - httpd_thread.setDaemon(True) - httpd_thread.start() - - def stop(self): - self.shutdown() - - def post_events(self): - q = queuemod.Queue() - - def do_nothing(handler): - handler.send_response(200) - handler.end_headers() - - self.post_paths["/api/events/bulk"] = do_nothing - self.post_paths["/bulk"] = do_nothing - return q - - def add_feature(self, data): - def handle(handler): - handler.send_response(200) - handler.send_header('Content-type', 'application/json') - handler.end_headers() - handler.wfile.write(json.dumps(data).encode('utf-8')) - - self.get("/api/eval/latest-features", handle) - - def get(self, path, func): - """ - Registers a handler function to be called when a GET request beginning with 'path' is made. - - :param path: The path prefix to listen on - :param func: The function to call. Should be a function that takes the querystring as a parameter. - """ - self.get_paths[path] = func - - def post(self, path, func): - """ - Registers a handler function to be called when a POST request beginning with 'path' is made. - - :param path: The path prefix to listen on - :param func: The function to call. Should be a function that takes the post body as a parameter. - """ - self.post_paths[path] = func - - def shutdown(self): - self.stopping = True - self.httpd.shutdown() - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - try: - self.shutdown() - finally: - pass - - -class SSEServer(GenericServer): - - def __init__(self, host='localhost', use_ssl=False, port=None, cert_file="self_signed.crt", - key_file="self_signed.key", queue=queuemod.Queue()): - GenericServer.__init__(self, host, use_ssl, port, cert_file, key_file) - - def feed_forever(handler): - handler.send_response(200) - handler.send_header( - 'Content-type', 'text/event-stream; charset=utf-8') - handler.end_headers() - while not self.stopping: - try: - event = queue.get(block=True, timeout=1) - """ :type: ldclient.twisted_sse.Event """ - if event: - lines = "event: {event}\ndata: {data}\n\n".format(event=event.event, - data=json.dumps(event.data)) - handler.wfile.write(lines.encode('utf-8')) - except Empty: - pass - - self.get_paths["/"] = feed_forever - self.queue = queue diff --git a/testing/twisted_util.py b/testing/twisted_util.py deleted file mode 100644 index 1bd1c778..00000000 --- a/testing/twisted_util.py +++ /dev/null @@ -1,29 +0,0 @@ -import time - -from twisted.internet import defer, reactor - - -@defer.inlineCallbacks -def wait_until(condition, timeout=5): - end_time = time.time() + timeout - - while True: - result = yield defer.maybeDeferred(condition) - if result: - defer.returnValue(condition) - elif time.time() > end_time: - raise Exception("Timeout waiting for {}".format( - condition.__name__)) # pragma: no cover - else: - d = defer.Deferred() - reactor.callLater(.1, d.callback, None) - yield d - - -def is_equal(f, val): - @defer.inlineCallbacks - def is_equal_eval(): - result = yield defer.maybeDeferred(f) - defer.returnValue(result == val) - - return is_equal_eval diff --git a/twisted-requirements.txt b/twisted-requirements.txt index 957f6c3f..e99d9e35 100644 --- a/twisted-requirements.txt +++ b/twisted-requirements.txt @@ -1,6 +1,5 @@ -cryptography>=1.4 -pyOpenSSL>=16.0.0 +cryptography>=1.0 +pyOpenSSL>=0.14 service_identity>=16.0 -twisted>=16.3.0 txredis>=2.4 -txrequests>=0.9.2 \ No newline at end of file +txrequests>=0.9.2 From 3bad5e84c48ca15378955262ee8934db2fa26c7d Mon Sep 17 00:00:00 2001 From: Dan Richelson Date: Tue, 16 Aug 2016 11:52:29 -0700 Subject: [PATCH 4/9] fix test --- testing/test_feature_store.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/testing/test_feature_store.py b/testing/test_feature_store.py index 96bb140c..0afa063e 100644 --- a/testing/test_feature_store.py +++ b/testing/test_feature_store.py @@ -66,30 +66,30 @@ def test_initialized(self, store): def test_get_existing_feature(self, store): store = self.base_initialized_store(store) expected = self.make_feature('foo', 10) - assert store.get('foo') == expected + assert store.get('foo', lambda x: x) == expected def test_get_nonexisting_feature(self, store): store = self.base_initialized_store(store) - assert store.get('biz') is None + assert store.get('biz', lambda x: x) is None def test_upsert_with_newer_version(self, store): store = self.base_initialized_store(store) new_ver = self.make_feature('foo', 11) store.upsert('foo', new_ver) - assert store.get('foo') == new_ver + assert store.get('foo', lambda x: x) == new_ver def test_upsert_with_older_version(self, store): store = self.base_initialized_store(store) new_ver = self.make_feature('foo', 9) expected = self.make_feature('foo', 10) store.upsert('foo', new_ver) - assert store.get('foo') == expected + assert store.get('foo', lambda x: x) == expected def test_upsert_with_new_feature(self, store): store = self.base_initialized_store(store) new_ver = self.make_feature('biz', 1) store.upsert('biz', new_ver) - assert store.get('biz') == new_ver + assert store.get('biz', lambda x: x) == new_ver def test_delete_with_newer_version(self, store): store = self.base_initialized_store(store) @@ -99,10 +99,10 @@ def test_delete_with_newer_version(self, store): def test_delete_unknown_feature(self, store): store = self.base_initialized_store(store) store.delete('biz', 11) - assert store.get('biz') is None + assert store.get('biz', lambda x: x) is None def test_delete_with_older_version(self, store): store = self.base_initialized_store(store) store.delete('foo', 9) expected = self.make_feature('foo', 10) - assert store.get('foo') == expected + assert store.get('foo', lambda x: x) == expected From c5dca8b6e77be8c146cdde9fa36f9c38485d8e49 Mon Sep 17 00:00:00 2001 From: Dan Richelson Date: Tue, 16 Aug 2016 11:53:07 -0700 Subject: [PATCH 5/9] fix test --- testing/test_feature_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing/test_feature_store.py b/testing/test_feature_store.py index 0afa063e..ef458986 100644 --- a/testing/test_feature_store.py +++ b/testing/test_feature_store.py @@ -94,7 +94,7 @@ def test_upsert_with_new_feature(self, store): def test_delete_with_newer_version(self, store): store = self.base_initialized_store(store) store.delete('foo', 11) - assert store.get('foo') is None + assert store.get('foo', lambda x: x) is None def test_delete_unknown_feature(self, store): store = self.base_initialized_store(store) From a01754c5404abb88bf68501a7ea14c1474c65e90 Mon Sep 17 00:00:00 2001 From: Dan Richelson Date: Tue, 16 Aug 2016 12:02:43 -0700 Subject: [PATCH 6/9] Remove twisted test --- testing/test_integration_twisted.py | 87 ----------------------------- 1 file changed, 87 deletions(-) delete mode 100644 testing/test_integration_twisted.py diff --git a/testing/test_integration_twisted.py b/testing/test_integration_twisted.py deleted file mode 100644 index 0972d00d..00000000 --- a/testing/test_integration_twisted.py +++ /dev/null @@ -1,87 +0,0 @@ -import logging -from ldclient import TwistedConfig, TwistedLDClient, LDClient -from ldclient.twisted_sse import Event -import pytest -from testing.server_util import SSEServer, GenericServer -from testing.twisted_util import wait_until, is_equal - -logging.basicConfig(level=logging.DEBUG) - - -@pytest.fixture() -def server(request): - server = GenericServer() - - def fin(): - server.shutdown() - - request.addfinalizer(fin) - return server - - -@pytest.fixture() -def stream(request): - server = SSEServer() - - def fin(): - server.shutdown() - - request.addfinalizer(fin) - return server - - -@pytest.inlineCallbacks -def test_toggle(server): - server.add_feature(feature("foo", "jim")['foo']) - client = TwistedLDClient("apikey", TwistedConfig(base_uri=server.url)) - yield wait_until(is_equal(lambda: client.toggle("foo", user('xyz'), "blah"), "jim")) - - -@pytest.inlineCallbacks -def test_sse_init(server, stream): - stream.queue.put(Event(event="put", data=feature("foo", "jim"))) - client = LDClient("apikey", TwistedConfig( - stream=True, base_uri=server.url, stream_uri=stream.url)) - yield wait_until(is_equal(lambda: client.is_initialized(), True)) - - -@pytest.inlineCallbacks -def test_sse_reconnect(server, stream): - server.post_events() - stream.queue.put(Event(event="put", data=feature("foo", "on"))) - client = LDClient("apikey", TwistedConfig( - stream=True, base_uri=server.url, stream_uri=stream.url)) - yield wait_until(is_equal(lambda: client.toggle("foo", user('xyz'), "blah"), "on")) - - stream.stop() - - yield wait_until(is_equal(lambda: client.toggle("foo", user('xyz'), "blah"), "on")) - - stream.start() - - stream.queue.put(Event(event="put", data=feature("foo", "jim"))) - client = LDClient("apikey", TwistedConfig( - stream=True, base_uri=server.url, stream_uri=stream.url)) - yield wait_until(is_equal(lambda: client.toggle("foo", user('xyz'), "blah"), "jim")) - - -def feature(key, val): - return { - key: {"name": "Feature {}".format(key), "key": key, "kind": "flag", "salt": "Zm9v", "on": val, - "variations": [{"value": val, "weight": 100, - "targets": [{"attribute": "key", "op": "in", "values": []}], - "userTarget": {"attribute": "key", "op": "in", "values": []}}, - {"value": False, "weight": 0, - "targets": [{"attribute": "key", "op": "in", "values": []}], - "userTarget": {"attribute": "key", "op": "in", "values": []}}], - "commitDate": "2015-09-08T21:24:16.712Z", - "creationDate": "2015-09-08T21:06:16.527Z", "version": 4}} - - -def user(name): - return { - u'key': name, - u'custom': { - u'bizzle': u'def' - } - } From 03278c6e547ac39fe565a7ec897a522637307107 Mon Sep 17 00:00:00 2001 From: Dan Richelson Date: Tue, 16 Aug 2016 12:09:04 -0700 Subject: [PATCH 7/9] Merge master --- MANIFEST.in | 1 + setup.py | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/MANIFEST.in b/MANIFEST.in index d3e3bd98..94847dd4 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,5 @@ include requirements.txt include README.txt include test-requirements.txt +include twisted-requirements.txt include redis-requirements.txt \ No newline at end of file diff --git a/setup.py b/setup.py index 25f530b4..1094fa70 100644 --- a/setup.py +++ b/setup.py @@ -10,12 +10,15 @@ # parse_requirements() returns generator of pip.req.InstallRequirement objects install_reqs = parse_requirements('requirements.txt', session=uuid.uuid1()) test_reqs = parse_requirements('test-requirements.txt', session=uuid.uuid1()) +twisted_reqs = parse_requirements( + 'twisted-requirements.txt', session=uuid.uuid1()) redis_reqs = parse_requirements('redis-requirements.txt', session=uuid.uuid1()) # reqs is a list of requirement # e.g. ['django==1.5.1', 'mezzanine==1.4.6'] reqs = [str(ir.req) for ir in install_reqs] testreqs = [str(ir.req) for ir in test_reqs] +txreqs = [str(ir.req) for ir in twisted_reqs] redisreqs = [str(ir.req) for ir in redis_reqs] @@ -50,6 +53,7 @@ def run(self): 'Programming Language :: Python :: 2 :: Only', ], extras_require={ + "twisted": txreqs, "redis": redisreqs }, tests_require=testreqs, From e6e98a6b7691cf24a4fbab77c30548f9111eb8b6 Mon Sep 17 00:00:00 2001 From: Dan Richelson Date: Thu, 18 Aug 2016 11:47:43 -0700 Subject: [PATCH 8/9] Address PR comments --- ldclient/interfaces.py | 9 +++++---- ldclient/redis_feature_store.py | 4 ++-- ldclient/twisted_client.py | 4 ++-- ldclient/twisted_event_consumer.py | 2 +- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/ldclient/interfaces.py b/ldclient/interfaces.py index fdc4d408..80ae7a8c 100644 --- a/ldclient/interfaces.py +++ b/ldclient/interfaces.py @@ -14,16 +14,17 @@ def get(self, key, callback): :param key: The feature key :type key: str :param callback: The function that accepts the feature data and returns the feature value - :type callback: function - :return: The feature value. None if not found + :type callback: Function that processes the feature flag once received. + :return: The result of executing callback. """ @abstractmethod def all(self, callback): """ Returns all feature flags and their data - - :rtype: dict[str, dict] + :param callback: The function that accepts the feature data and returns the feature value + :type callback: Function that processes the feature flags once received. + :rtype: The result of executing callback. """ @abstractmethod diff --git a/ldclient/redis_feature_store.py b/ldclient/redis_feature_store.py index 2ee9cc89..7be129e3 100644 --- a/ldclient/redis_feature_store.py +++ b/ldclient/redis_feature_store.py @@ -56,7 +56,7 @@ def all(self, callback): results[f['key']] = f return callback(results) - def get(self, key, callback): + def get(self, key, callback=lambda x: x): f = self._cache.get(key) if f is not None: # reset ttl @@ -112,7 +112,7 @@ def initialized(self): def upsert(self, key, feature): r = redis.Redis(connection_pool=self._pool) r.watch(self._features_key) - old = self.get(key, lambda x: x) + old = self.get(key) if old: if old['version'] >= feature['version']: r.unwatch() diff --git a/ldclient/twisted_client.py b/ldclient/twisted_client.py index 01ba3e90..90ce50dc 100644 --- a/ldclient/twisted_client.py +++ b/ldclient/twisted_client.py @@ -35,8 +35,8 @@ def cb(result): def _evaluate_internal(self, flag, user): def check_prereq_results(result): prereq_ok = True - for r in result: # r is a tuple of 2 booleans: (error, prereqMatches) - if r[0] is False or r[1] is False: + for (success, prereq_ok) in result: + if success is False or prereq_ok is False: prereq_ok = False if prereq_ok is True: diff --git a/ldclient/twisted_event_consumer.py b/ldclient/twisted_event_consumer.py index 286d8389..18b444d6 100644 --- a/ldclient/twisted_event_consumer.py +++ b/ldclient/twisted_event_consumer.py @@ -17,7 +17,7 @@ class TwistedEventConsumer(EventConsumer): def __init__(self, queue, sdk_key, config): self._queue = queue - """ @type: queue.Queue """ + """ :type: queue.Queue """ self._session = CacheControl(txrequests.Session()) """ :type: txrequests.Session """ From 9ce49fea232758c118ff657d4809153c440df534 Mon Sep 17 00:00:00 2001 From: Dan Richelson Date: Mon, 22 Aug 2016 18:49:43 -0700 Subject: [PATCH 9/9] Update version. Update Readme --- CHANGELOG.md | 7 +++++++ README.md | 27 ++++++++++++++++++++++++++- ldclient/version.py | 2 +- setup.py | 2 +- 4 files changed, 35 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cc9c77f5..e81931a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,13 @@ All notable changes to the LaunchDarkly Python SDK will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org). +## [3.0.0] - 2016-08-22 +### Added +- Twisted support for LDD mode only. + +### Changed +- FeatureStore interface get() and all() methods now take an additional callback parameter. + ## [2.0.0] - 2016-08-10 ### Added - Support for multivariate feature flags. `variation` replaces `toggle` and can return a string, number, dict, or boolean value depending on how the flag is defined. diff --git a/README.md b/README.md index daa689d2..53073479 100644 --- a/README.md +++ b/README.md @@ -26,13 +26,38 @@ Your first feature flag ----------------------- 1. Create a new feature flag on your [dashboard](https://app.launchdarkly.com) -2. In your application code, use the feature's key to check wthether the flag is on for each user: +2. In your application code, use the feature's key to check whether the flag is on for each user: if client.variation("your.flag.key", {"key": "user@test.com"}, False): # application code to show the feature else: # the code to run if the feature is off +Twisted +------- +Twisted is supported for LDD mode only. To run in Twisted/LDD mode, + +1. Use this dependency: + + ``` + ldclient-py[twisted]==3.0.0 + ``` +2. Configure the client: + + ``` + feature_store = TwistedRedisFeatureStore(url='YOUR_REDIS_URL', redis_prefix="ldd-restwrapper", expiration=0) + ldclient.config.feature_store = feature_store + + ldclient.config = ldclient.Config( + use_ldd=use_ldd, + event_consumer_class=TwistedEventConsumer, + ) + ldclient.sdk_key = 'YOUR_SDK_KEY' + ``` +3. Get the client: + + ```client = ldclient.get()``` + Learn more ----------- diff --git a/ldclient/version.py b/ldclient/version.py index 21014090..ea9d6945 100644 --- a/ldclient/version.py +++ b/ldclient/version.py @@ -1 +1 @@ -VERSION = "2.0.0" +VERSION = "3.0.0" diff --git a/setup.py b/setup.py index 1094fa70..4ad357b5 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ def run(self): setup( name='ldclient-py', - version='2.0.0', + version='3.0.0', author='Catamorphic Co.', author_email='team@catamorphic.com', packages=['ldclient'],