diff --git a/ldclient/client.py b/ldclient/client.py index 22d63ea8..55adfbbe 100644 --- a/ldclient/client.py +++ b/ldclient/client.py @@ -4,7 +4,6 @@ import hmac import threading -import requests from builtins import object from ldclient.config import Config as Config @@ -42,7 +41,6 @@ def __init__(self, sdk_key=None, config=None, start_wait=5): self._config = config or Config.default() self._config._validate() - self._session = CacheControl(requests.Session()) self._event_processor = None self._lock = Lock() diff --git a/ldclient/event_processor.py b/ldclient/event_processor.py index 4dd4e46a..fa6061b4 100644 --- a/ldclient/event_processor.py +++ b/ldclient/event_processor.py @@ -5,7 +5,9 @@ import errno import jsonpickle from threading import Event, Lock, Thread +import six import time +import urllib3 # noinspection PyBroadException try: @@ -14,19 +16,17 @@ # noinspection PyUnresolvedReferences,PyPep8Naming import Queue as queue -import requests -from requests.packages.urllib3.exceptions import ProtocolError - -import six - from ldclient.event_summarizer import EventSummarizer from ldclient.fixed_thread_pool import FixedThreadPool from ldclient.lru_cache import SimpleLRUCache from ldclient.user_filter import UserFilter from ldclient.interfaces import EventProcessor from ldclient.repeating_timer import RepeatingTimer +from ldclient.util import UnsuccessfulResponseException from ldclient.util import _headers +from ldclient.util import create_http_pool_manager from ldclient.util import log +from ldclient.util import http_error_message, is_http_error_recoverable, throw_if_unsuccessful_response __MAX_FLUSH_THREADS__ = 5 @@ -144,8 +144,8 @@ def make_summary_event(self, summary): class EventPayloadSendTask(object): - def __init__(self, session, config, formatter, payload, response_fn): - self._session = session + def __init__(self, http, config, formatter, payload, response_fn): + self._http = http self._config = config self._formatter = formatter self._payload = payload @@ -154,15 +154,13 @@ def __init__(self, session, config, formatter, payload, response_fn): def run(self): try: output_events = self._formatter.make_output_events(self._payload.events, self._payload.summary) - resp = self._do_send(output_events, True) - if resp is not None: - self._response_fn(resp) + resp = self._do_send(output_events) except Exception: log.warning( 'Unhandled exception in event processor. Analytics events were not processed.', exc_info=True) - def _do_send(self, output_events, should_retry): + def _do_send(self, output_events): # noinspection PyBroadException try: json_body = jsonpickle.encode(output_events, unpicklable=False) @@ -170,27 +168,16 @@ def _do_send(self, output_events, should_retry): hdrs = _headers(self._config.sdk_key) hdrs['X-LaunchDarkly-Event-Schema'] = str(__CURRENT_EVENT_SCHEMA__) uri = self._config.events_uri - r = self._session.post(uri, + r = self._http.request('POST', uri, headers=hdrs, - timeout=(self._config.connect_timeout, self._config.read_timeout), - data=json_body) - r.raise_for_status() + timeout=urllib3.Timeout(connect=self._config.connect_timeout, read=self._config.read_timeout), + body=json_body, + retries=1) + self._response_fn(r) return r - except ProtocolError as e: - if e.args is not None and len(e.args) > 1 and e.args[1] is not None: - inner = e.args[1] - if inner.errno is not None and inner.errno == errno.ECONNRESET and should_retry: - log.warning( - 'ProtocolError exception caught while sending events. Retrying.') - self._do_send(output_events, False) - else: - log.warning( - 'Unhandled exception in event processor. Analytics events were not processed.', - exc_info=True) - except Exception: + except Exception as e: log.warning( - 'Unhandled exception in event processor. Analytics events were not processed.', - exc_info=True) + 'Unhandled exception in event processor. Analytics events were not processed. [%s]', e) FlushPayload = namedtuple('FlushPayload', ['events', 'summary']) @@ -224,11 +211,11 @@ def clear(self): class EventDispatcher(object): - def __init__(self, queue, config, session): + def __init__(self, queue, config, http_client): self._queue = queue self._config = config - self._session = requests.Session() if session is None else session - self._close_session = (session is None) # so we know whether to close it later + self._http = create_http_pool_manager(num_pools=1, verify_ssl=config.verify_ssl) if http_client is None else http_client + self._close_http = (http_client is None) # so we know whether to close it later self._disabled = False self._buffer = EventBuffer(config.events_max_pending) self._user_keys = SimpleLRUCache(config.user_keys_capacity) @@ -261,7 +248,6 @@ def _run_main_loop(self): return except Exception: log.error('Unhandled exception in event processor', exc_info=True) - self._session.close() def _process_event(self, event): if self._disabled: @@ -320,7 +306,7 @@ def _trigger_flush(self): return payload = self._buffer.get_payload() if len(payload.events) > 0 or len(payload.summary.counters) > 0: - task = EventPayloadSendTask(self._session, self._config, self._formatter, payload, + task = EventPayloadSendTask(self._http, self._config, self._formatter, payload, self._handle_response) if self._flush_workers.execute(task.run): # The events have been handed off to a flush worker; clear them from our buffer. @@ -330,26 +316,27 @@ def _trigger_flush(self): pass def _handle_response(self, r): - server_date_str = r.headers.get('Date') + server_date_str = r.getheader('Date') if server_date_str is not None: server_date = parsedate(server_date_str) if server_date is not None: timestamp = int(time.mktime(server_date) * 1000) self._last_known_past_time = timestamp - if r.status_code == 401: - log.error('Received 401 error, no further events will be posted since SDK key is invalid') - self._disabled = True - return + if r.status > 299: + log.error(http_error_message(r.status, "event delivery", "some events were dropped")) + if not is_http_error_recoverable(r.status): + self._disabled = True + return def _do_shutdown(self): self._flush_workers.stop() self._flush_workers.wait() - if self._close_session: - self._session.close() + if self._close_http: + self._http.clear() class DefaultEventProcessor(EventProcessor): - def __init__(self, config, session=None): + def __init__(self, config, http=None): self._queue = queue.Queue(config.events_max_pending) self._flush_timer = RepeatingTimer(config.flush_interval, self.flush) self._users_flush_timer = RepeatingTimer(config.user_keys_flush_interval, self._flush_users) @@ -357,7 +344,7 @@ def __init__(self, config, session=None): self._users_flush_timer.start() self._close_lock = Lock() self._closed = False - EventDispatcher(self._queue, config, session) + EventDispatcher(self._queue, config, http) def send_event(self, event): event['creationDate'] = int(time.time() * 1000) diff --git a/ldclient/feature_requester.py b/ldclient/feature_requester.py index c29d4d79..786c1708 100644 --- a/ldclient/feature_requester.py +++ b/ldclient/feature_requester.py @@ -1,52 +1,61 @@ from __future__ import absolute_import -import requests -from cachecontrol import CacheControl +from collections import namedtuple +import json +import urllib3 from ldclient.interfaces import FeatureRequester +from ldclient.util import UnsuccessfulResponseException from ldclient.util import _headers +from ldclient.util import create_http_pool_manager from ldclient.util import log +from ldclient.util import throw_if_unsuccessful_response from ldclient.versioned_data_kind import FEATURES, SEGMENTS LATEST_ALL_URI = '/sdk/latest-all' +CacheEntry = namedtuple('CacheEntry', ['data', 'etag']) + + class FeatureRequesterImpl(FeatureRequester): def __init__(self, config): - self._session_cache = CacheControl(requests.Session()) - self._session_no_cache = requests.Session() + self._cache = dict() + self._http = create_http_pool_manager(num_pools=1, verify_ssl=config.verify_ssl) self._config = config def get_all_data(self): - hdrs = _headers(self._config.sdk_key) - uri = self._config.base_uri + LATEST_ALL_URI - r = self._session_cache.get(uri, - headers=hdrs, - timeout=( - self._config.connect_timeout, - self._config.read_timeout)) - r.raise_for_status() - all_data = r.json() - log.debug("Get All flags response status:[%d] From cache?[%s] ETag:[%s]", - r.status_code, r.from_cache, r.headers.get('ETag')) + all_data = self._do_request(self._config.base_uri + LATEST_ALL_URI, True) return { FEATURES: all_data['flags'], SEGMENTS: all_data['segments'] } def get_one(self, kind, key): + return self._do_request(kind.request_api_path + '/' + key, False) + + def _do_request(self, uri, allow_cache): hdrs = _headers(self._config.sdk_key) - path = kind.request_api_path + '/' + key - uri = config.base_uri + path - log.debug("Getting %s from %s using uri: %s", key, kind['namespace'], uri) - r = self._session_no_cache.get(uri, - headers=hdrs, - timeout=( - self._config.connect_timeout, - self._config.read_timeout)) - r.raise_for_status() - obj = r.json() - log.debug("%s response status:[%d] key:[%s] version:[%d]", - path, r.status_code, key, segment.get("version")) - return obj + if allow_cache: + cache_entry = self._cache.get(uri) + if cache_entry is not None: + hdrs['If-None-Match'] = cache_entry.etag + r = self._http.request('GET', uri, + headers=hdrs, + timeout=urllib3.Timeout(connect=self._config.connect_timeout, read=self._config.read_timeout), + retries=1) + throw_if_unsuccessful_response(r) + if r.status == 304 and cache_entry is not None: + data = cache_entry.data + etag = cache_entry.etag + from_cache = True + else: + data = json.loads(r.data.decode('UTF-8')) + etag = r.getheader('ETag') + from_cache = False + if allow_cache and etag is not None: + self._cache[uri] = CacheEntry(data=data, etag=etag) + log.debug("%s response status:[%d] From cache? [%s] ETag:[%s]", + uri, r.status, from_cache, etag) + return data diff --git a/ldclient/polling.py b/ldclient/polling.py index 8efa5913..81881d49 100644 --- a/ldclient/polling.py +++ b/ldclient/polling.py @@ -2,7 +2,8 @@ from ldclient.interfaces import UpdateProcessor from ldclient.util import log -from requests import HTTPError +from ldclient.util import UnsuccessfulResponseException, http_error_message, is_http_error_recoverable + import time @@ -28,15 +29,15 @@ def run(self): if not self._ready.is_set() is True and self._store.initialized is True: log.info("PollingUpdateProcessor initialized ok") self._ready.set() - except HTTPError as e: - log.error('Received unexpected status code %d from polling request' % e.response.status_code) - if e.response.status_code == 401: - log.error('Received 401 error, no further polling requests will be made since SDK key is invalid') + except UnsuccessfulResponseException as e: + log.error(http_error_message(e.status, "polling request")) + if not is_http_error_recoverable(e.status): + self._ready.set() # if client is initializing, make it stop waiting; has no effect if already inited self.stop() break - except Exception: + except Exception as e: log.exception( - 'Error: Exception encountered when updating flags.') + 'Error: Exception encountered when updating flags. %s' % e) elapsed = time.time() - start_time if elapsed < self._config.poll_interval: diff --git a/ldclient/sse_client.py b/ldclient/sse_client.py index f0fc7260..c97eb2d4 100644 --- a/ldclient/sse_client.py +++ b/ldclient/sse_client.py @@ -6,7 +6,10 @@ import six -import requests +import urllib3 + +from ldclient.util import create_http_pool_manager +from ldclient.util import throw_if_unsuccessful_response # Inspired by: https://bitbucket.org/btubbs/sseclient/src/a47a380a3d7182a205c0f1d5eb470013ce796b4d/sseclient.py?at=default&fileviewer=file-view-default @@ -16,7 +19,8 @@ class SSEClient(object): - def __init__(self, url, last_id=None, retry=3000, connect_timeout=10, read_timeout=300, chunk_size=10000, session=None, **kwargs): + def __init__(self, url, last_id=None, retry=3000, connect_timeout=10, read_timeout=300, chunk_size=10000, + verify_ssl=False, http=None, **kwargs): self.url = url self.last_id = last_id self.retry = retry @@ -24,10 +28,10 @@ def __init__(self, url, last_id=None, retry=3000, connect_timeout=10, read_timeo self._read_timeout = read_timeout self._chunk_size = chunk_size - # Optional support for passing in a requests.Session() - self.session = session + # Optional support for passing in an HTTP client + self.http = create_http_pool_manager(num_pools=1, verify_ssl=verify_ssl) - # Any extra kwargs will be fed into the requests.get call later. + # Any extra kwargs will be fed into the request call later. self.requests_kwargs = kwargs # The SSE spec requires making requests with Cache-Control: nocache @@ -48,21 +52,22 @@ def _connect(self): self.requests_kwargs['headers']['Last-Event-ID'] = self.last_id # Use session if set. Otherwise fall back to requests module. - requester = self.session or requests - self.resp = requester.get( + self.resp = self.http.request( + 'GET', self.url, - stream=True, - timeout=(self._connect_timeout, self._read_timeout), + timeout=urllib3.Timeout(connect=self._connect_timeout, read=self._read_timeout), + preload_content=False, + retries=0, # caller is responsible for implementing appropriate retry semantics, e.g. backoff **self.requests_kwargs) # Raw readlines doesn't work because we may be missing newline characters until the next chunk # For some reason, we also need to specify a chunk size because stream=True doesn't seem to guarantee # that we get the newlines in a timeline manner - self.resp_file = self.resp.iter_content(chunk_size=self._chunk_size, decode_unicode=True) + self.resp_file = self.resp.stream(amt=self._chunk_size) # TODO: Ensure we're handling redirects. Might also stick the 'origin' # attribute on Events like the Javascript spec requires. - self.resp.raise_for_status() + throw_if_unsuccessful_response(self.resp) def _event_complete(self): return re.search(end_of_field, self.buf[len(self.buf)-self._chunk_size-10:]) is not None # Just search the last chunk plus a bit @@ -77,8 +82,8 @@ def __next__(self): # There are some bad cases where we don't always get a line: https://github.com/requests/requests/pull/2431 if not nextline: raise EOFError() - self.buf += nextline - except (StopIteration, requests.RequestException, EOFError) as e: + self.buf += nextline.decode("utf-8") + except (StopIteration, EOFError) as e: time.sleep(self.retry / 1000.0) self._connect() diff --git a/ldclient/streaming.py b/ldclient/streaming.py index 58356f34..89ef4faf 100644 --- a/ldclient/streaming.py +++ b/ldclient/streaming.py @@ -5,12 +5,11 @@ from threading import Thread import backoff -from requests import HTTPError import time from ldclient.interfaces import UpdateProcessor from ldclient.sse_client import SSEClient -from ldclient.util import _stream_headers, log +from ldclient.util import _stream_headers, log, UnsuccessfulResponseException, http_error_message, is_http_error_recoverable from ldclient.versioned_data_kind import FEATURES, SEGMENTS # allows for up to 5 minutes to elapse without any data sent across the stream. The heartbeats sent as comments on the @@ -49,34 +48,32 @@ def run(self): if message_ok is True and self._ready.is_set() is False: log.info("StreamingUpdateProcessor initialized ok.") self._ready.set() - except HTTPError as e: - log.error("Received unexpected status code %d for stream connection" % e.response.status_code) - if e.response.status_code == 401: - log.error("Received 401 error, no further streaming connection will be made since SDK key is invalid") + except UnsuccessfulResponseException as e: + log.error(http_error_message(e.status, "stream connection")) + if not is_http_error_recoverable(e.status): + self._ready.set() # if client is initializing, make it stop waiting; has no effect if already inited self.stop() break - else: - log.warning("Restarting stream connection after one second.") - except Exception: - log.warning("Caught exception. Restarting stream connection after one second.", - exc_info=True) + except Exception as e: + log.warning("Caught exception. Restarting stream connection after one second. %s" % e) + # no stacktrace here because, for a typical connection error, it'll just be a lengthy tour of urllib3 internals time.sleep(1) def _backoff_expo(): return backoff.expo(max_value=30) def should_not_retry(e): - return isinstance(e, HTTPError) and (e.response.status_code == 401) + return isinstance(e, UnsuccessfulResponseException) and (not is_http_error_recoverable(e.status)) @backoff.on_exception(_backoff_expo, BaseException, max_tries=None, jitter=backoff.full_jitter, giveup=should_not_retry) def _connect(self): return SSEClient( self._uri, - verify=self._config.verify_ssl, headers=_stream_headers(self._config.sdk_key), connect_timeout=self._config.connect_timeout, - read_timeout=stream_read_timeout) + read_timeout=stream_read_timeout, + verify_ssl=self._config.verify_ssl) def stop(self): log.info("Stopping StreamingUpdateProcessor") diff --git a/ldclient/util.py b/ldclient/util.py index 015f5ace..618a7d9e 100644 --- a/ldclient/util.py +++ b/ldclient/util.py @@ -1,7 +1,9 @@ from __future__ import division, with_statement, absolute_import +import certifi import logging import sys +import urllib3 from ldclient.version import VERSION @@ -66,3 +68,43 @@ def __init__(self, data='', event='message', event_id=None, retry=None): def __str__(self, *args, **kwargs): return self.data + + +class UnsuccessfulResponseException(Exception): + def __init__(self, status): + super(UnsuccessfulResponseException, self).__init__("HTTP error %d" % status) + self._status = status + + @property + def status(self): + return self._status + + +def create_http_pool_manager(num_pools=1, verify_ssl=False): + if not verify_ssl: + return urllib3.PoolManager(num_pools=num_pools) + return urllib3.PoolManager( + num_pools=num_pools, + cert_reqs='CERT_REQUIRED', + ca_certs=certifi.where() + ) + + +def throw_if_unsuccessful_response(resp): + if resp.status >= 400: + raise UnsuccessfulResponseException(resp.status) + + +def is_http_error_recoverable(status): + if status >= 400 and status < 500: + return (status == 400) or (status == 408) or (status == 429) # all other 4xx besides these are unrecoverable + return True # all other errors are recoverable + + +def http_error_message(status, context, retryable_message = "will retry"): + return "Received HTTP error %d%s for %s - %s" % ( + status, + " (invalid SDK key)" if (status == 401 or status == 403) else "", + context, + retryable_message if is_http_error_recoverable(status) else "giving up permanently" + ) diff --git a/requirements.txt b/requirements.txt index ebdbadf1..53593a2d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,9 @@ backoff>=1.4.3 CacheControl>=0.12.3 -requests>=2.17.3 +certifi>=2018.4.16 future>=0.16.0 six>=1.10.0 pyRFC3339>=1.0 jsonpickle==0.9.3 semver>=2.7.9 +urllib3>=1.22.0 diff --git a/test-requirements.txt b/test-requirements.txt index a75fc427..ff54bd08 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,6 +1,5 @@ mock>=2.0.0 pytest>=2.8 -pytest-timeout>=1.0 redis>=2.10.5 coverage>=4.3.4,<4.4 pytest-cov>=2.4.0 diff --git a/testing/stub_util.py b/testing/stub_util.py new file mode 100644 index 00000000..07e5c2ec --- /dev/null +++ b/testing/stub_util.py @@ -0,0 +1,103 @@ +from email.utils import formatdate +from requests.structures import CaseInsensitiveDict + +from ldclient.interfaces import EventProcessor, FeatureRequester, UpdateProcessor + + +class MockEventProcessor(EventProcessor): + def __init__(self, *_): + self._running = False + self._events = [] + mock_event_processor = self + + def stop(self): + self._running = False + + def start(self): + self._running = True + + def is_alive(self): + return self._running + + def send_event(self, event): + self._events.append(event) + + def flush(self): + pass + +class MockFeatureRequester(FeatureRequester): + def __init__(self): + self.all_data = {} + self.exception = None + + def get_all_data(self): + if self.exception is not None: + raise self.exception + return self.all_data + + def get_one(self, kind, key): + pass + +class MockResponse(object): + def __init__(self, status, headers): + self._status = status + self._headers = headers + + @property + def status(self): + return self._status + + def getheader(self, name): + return self._headers.get(name.lower()) + +class MockHttp(object): + def __init__(self): + self._request_data = None + self._request_headers = None + self._response_status = 200 + self._server_time = None + + def request(self, method, uri, headers, timeout, body, retries): + self._request_headers = headers + self._request_data = body + resp_hdr = dict() + if self._server_time is not None: + resp_hdr['date'] = formatdate(self._server_time / 1000, localtime=False, usegmt=True) + return MockResponse(self._response_status, resp_hdr) + + def clear(self): + pass + + @property + def request_data(self): + return self._request_data + + @property + def request_headers(self): + return self._request_headers + + def set_response_status(self, status): + self._response_status = status + + def set_server_time(self, timestamp): + self._server_time = timestamp + + def reset(self): + self._request_headers = None + self._request_data = None + +class MockUpdateProcessor(UpdateProcessor): + def __init__(self, config, store, ready): + ready.set() + + def start(self): + pass + + def stop(self): + pass + + def is_alive(self): + return True + + def initialized(self): + return True diff --git a/testing/test_event_processor.py b/testing/test_event_processor.py index 6bee3cbd..f4ad9ab8 100644 --- a/testing/test_event_processor.py +++ b/testing/test_event_processor.py @@ -1,13 +1,12 @@ -from email.utils import formatdate import json import pytest -from requests.structures import CaseInsensitiveDict import time from ldclient.config import Config from ldclient.event_processor import DefaultEventProcessor - from ldclient.util import log +from testing.stub_util import MockResponse, MockHttp + default_config = Config() user = { @@ -20,65 +19,12 @@ } ep = None -mock_session = None - - -class MockResponse(object): - def __init__(self, status, headers): - self._status = status - self._headers = headers - - @property - def status_code(self): - return self._status - - @property - def headers(self): - return self._headers - - def raise_for_status(self): - pass - -class MockSession(object): - def __init__(self): - self._request_data = None - self._request_headers = None - self._response_status = 200 - self._server_time = None - - def post(self, uri, headers, timeout, data): - self._request_headers = headers - self._request_data = data - resp_hdr = CaseInsensitiveDict() - if self._server_time is not None: - resp_hdr['Date'] = formatdate(self._server_time / 1000, localtime=False, usegmt=True) - return MockResponse(self._response_status, resp_hdr) - - def close(self): - pass - - @property - def request_data(self): - return self._request_data - - @property - def request_headers(self): - return self._request_headers - - def set_response_status(self, status): - self._response_status = status - - def set_server_time(self, timestamp): - self._server_time = timestamp - - def clear(self): - self._request_headers = None - self._request_data = None +mock_http = None def setup_function(): - global mock_session - mock_session = MockSession() + global mock_http + mock_http = MockHttp() def teardown_function(): if ep is not None: @@ -86,7 +32,7 @@ def teardown_function(): def setup_processor(config): global ep - ep = DefaultEventProcessor(config, mock_session) + ep = DefaultEventProcessor(config, mock_http) def test_identify_event_is_queued(): @@ -271,7 +217,7 @@ def test_debug_mode_expires_based_on_client_time_if_client_time_is_later_than_se server_time = now() - 20000 # Send and flush an event we don't care about, just to set the last server time - mock_session.set_server_time(server_time) + mock_http.set_server_time(server_time) ep.send_event({ 'kind': 'identify', 'user': { 'key': 'otherUser' }}) flush_and_get_events() @@ -298,7 +244,7 @@ def test_debug_mode_expires_based_on_server_time_if_server_time_is_later_than_cl server_time = now() + 20000 # Send and flush an event we don't care about, just to set the last server time - mock_session.set_server_time(server_time) + mock_http.set_server_time(server_time) ep.send_event({ 'kind': 'identify', 'user': { 'key': 'otherUser' }}) flush_and_get_events() @@ -404,7 +350,7 @@ def test_nothing_is_sent_if_there_are_no_events(): setup_processor(Config()) ep.flush() ep._wait_until_inactive() - assert mock_session.request_data is None + assert mock_http.request_data is None def test_sdk_key_is_sent(): setup_processor(Config(sdk_key = 'SDK_KEY')) @@ -413,30 +359,58 @@ def test_sdk_key_is_sent(): ep.flush() ep._wait_until_inactive() - assert mock_session.request_headers.get('Authorization') is 'SDK_KEY' + assert mock_http.request_headers.get('Authorization') is 'SDK_KEY' def test_no_more_payloads_are_sent_after_401_error(): + verify_unrecoverable_http_error(401) + +def test_no_more_payloads_are_sent_after_403_error(): + verify_unrecoverable_http_error(403) + +def test_will_still_send_after_408_error(): + verify_recoverable_http_error(408) + +def test_will_still_send_after_429_error(): + verify_recoverable_http_error(429) + +def test_will_still_send_after_500_error(): + verify_recoverable_http_error(500) + +def verify_unrecoverable_http_error(status): setup_processor(Config(sdk_key = 'SDK_KEY')) - mock_session.set_response_status(401) + mock_http.set_response_status(status) + ep.send_event({ 'kind': 'identify', 'user': user }) + ep.flush() + ep._wait_until_inactive() + mock_http.reset() + ep.send_event({ 'kind': 'identify', 'user': user }) ep.flush() ep._wait_until_inactive() - mock_session.clear() + assert mock_http.request_data is None + +def verify_recoverable_http_error(status): + setup_processor(Config(sdk_key = 'SDK_KEY')) + mock_http.set_response_status(status) ep.send_event({ 'kind': 'identify', 'user': user }) ep.flush() ep._wait_until_inactive() - assert mock_session.request_data is None + mock_http.reset() + ep.send_event({ 'kind': 'identify', 'user': user }) + ep.flush() + ep._wait_until_inactive() + assert mock_http.request_data is not None def flush_and_get_events(): ep.flush() ep._wait_until_inactive() - if mock_session.request_data is None: + if mock_http.request_data is None: raise AssertionError('Expected to get an HTTP request but did not get one') else: - return json.loads(mock_session.request_data) + return json.loads(mock_http.request_data) def check_index_event(data, source, user): assert data['kind'] == 'index' diff --git a/testing/test_ldclient.py b/testing/test_ldclient.py index b05a0057..ce6ebdb5 100644 --- a/testing/test_ldclient.py +++ b/testing/test_ldclient.py @@ -5,6 +5,7 @@ from ldclient.interfaces import FeatureRequester, FeatureStore, UpdateProcessor from ldclient.versioned_data_kind import FEATURES import pytest +from testing.stub_util import MockEventProcessor, MockUpdateProcessor from testing.sync_util import wait_until try: @@ -13,45 +14,6 @@ import Queue as queue -class MockEventProcessor(object): - def __init__(self, *_): - self._running = False - self._events = [] - mock_event_processor = self - - def stop(self): - self._running = False - - def start(self): - self._running = True - - def is_alive(self): - return self._running - - def send_event(self, event): - self._events.append(event) - - def flush(self): - pass - - -class MockUpdateProcessor(UpdateProcessor): - def __init__(self, config, store, ready): - ready.set() - - def start(self): - pass - - def stop(self): - pass - - def is_alive(self): - return True - - def initialized(self): - return True - - client = LDClient(config=Config(base_uri="http://localhost:3000", event_processor_class = MockEventProcessor, update_processor_class = MockUpdateProcessor)) offline_client = LDClient(config= diff --git a/testing/test_polling_processor.py b/testing/test_polling_processor.py new file mode 100644 index 00000000..06bae21d --- /dev/null +++ b/testing/test_polling_processor.py @@ -0,0 +1,93 @@ +import pytest +import threading +import time + +from ldclient.config import Config +from ldclient.feature_store import InMemoryFeatureStore +from ldclient.interfaces import FeatureRequester +from ldclient.polling import PollingUpdateProcessor +from ldclient.util import UnsuccessfulResponseException +from ldclient.versioned_data_kind import FEATURES, SEGMENTS +from testing.stub_util import MockFeatureRequester, MockResponse + +config = Config() +pp = None +mock_requester = None +store = None +ready = None + + +def setup_function(): + global mock_requester, store, ready + mock_requester = MockFeatureRequester() + store = InMemoryFeatureStore() + ready = threading.Event() + +def teardown_function(): + if pp is not None: + pp.stop() + +def setup_processor(config): + global pp + pp = PollingUpdateProcessor(config, mock_requester, store, ready) + pp.start() + +def test_successful_request_puts_feature_data_in_store(): + flag = { + "key": "flagkey" + } + segment = { + "key": "segkey" + } + mock_requester.all_data = { + FEATURES: { + "flagkey": flag + }, + SEGMENTS: { + "segkey": segment + } + } + setup_processor(config) + ready.wait() + assert store.get(FEATURES, "flagkey", lambda x: x) == flag + assert store.get(SEGMENTS, "segkey", lambda x: x) == segment + assert store.initialized + assert pp.initialized() + +def test_general_connection_error_does_not_cause_immediate_failure(): + mock_requester.exception = Exception("bad") + start_time = time.time() + setup_processor(config) + ready.wait(0.3) + elapsed_time = time.time() - start_time + assert elapsed_time >= 0.2 + assert not pp.initialized() + +def test_http_401_error_causes_immediate_failure(): + verify_unrecoverable_http_error(401) + +def test_http_403_error_causes_immediate_failure(): + verify_unrecoverable_http_error(401) + +def test_http_408_error_does_not_cause_immediate_failure(): + verify_recoverable_http_error(408) + +def test_http_429_error_does_not_cause_immediate_failure(): + verify_recoverable_http_error(429) + +def test_http_500_error_does_not_cause_immediate_failure(): + verify_recoverable_http_error(500) + +def verify_unrecoverable_http_error(status): + mock_requester.exception = UnsuccessfulResponseException(status) + setup_processor(config) + finished = ready.wait(5.0) + assert finished + assert not pp.initialized() + +def verify_recoverable_http_error(status): + mock_requester.exception = UnsuccessfulResponseException(status) + setup_processor(config) + finished = ready.wait(0.2) + assert not finished + assert not pp.initialized()