From 5c8fa56a5bf9039d3798d2831ec2de2ab196936b Mon Sep 17 00:00:00 2001 From: Tarek Ziade Date: Mon, 8 Feb 2016 13:47:47 +0100 Subject: [PATCH] Notifications are now asynchronous --- cliquet/__init__.py | 3 +- cliquet/initialization.py | 7 ++ cliquet/listeners/__init__.py | 20 +++- cliquet/listeners/redis.py | 2 +- cliquet/tests/support.py | 3 + cliquet/tests/test_listeners.py | 9 +- cliquet/tests/test_workers.py | 68 ++++++++++++ cliquet/workers.py | 125 +++++++++++++++++++++++ cliquet_docs/reference/configuration.rst | 24 +++++ requirements.txt | 1 + setup.py | 1 + 11 files changed, 257 insertions(+), 6 deletions(-) create mode 100644 cliquet/tests/test_workers.py create mode 100644 cliquet/workers.py diff --git a/cliquet/__init__.py b/cliquet/__init__.py index 5751c46b..89a97928 100644 --- a/cliquet/__init__.py +++ b/cliquet/__init__.py @@ -55,7 +55,8 @@ 'cliquet.initialization.setup_authentication', 'cliquet.initialization.setup_backoff', 'cliquet.initialization.setup_statsd', - 'cliquet.initialization.setup_listeners' + 'cliquet.initialization.setup_listeners', + 'cliquet.initialization.setup_workers' ), 'event_listeners': '', 'logging_renderer': 'cliquet.logs.ClassicLogRenderer', diff --git a/cliquet/initialization.py b/cliquet/initialization.py index 0c3fb70a..6d3d0dd4 100644 --- a/cliquet/initialization.py +++ b/cliquet/initialization.py @@ -25,6 +25,7 @@ from cliquet import permission from cliquet.logs import logger from cliquet.events import ResourceRead, ResourceChanged, ACTIONS +from cliquet.workers import get_memory_workers from pyramid.events import NewRequest, NewResponse from pyramid.exceptions import ConfigurationError @@ -463,6 +464,12 @@ def setup_listeners(config): config.add_subscriber(listener, ResourceChanged, **options) +def setup_workers(config): + settings = config.get_settings() + num_workers = int(settings.get('background.processes', 1)) + config.registry.workers = get_memory_workers(num_workers) + + def load_default_settings(config, default_settings): """Read settings provided in Paste ini file, set default values and replace if defined as environment variable. diff --git a/cliquet/listeners/__init__.py b/cliquet/listeners/__init__.py index 85002092..779df4e3 100644 --- a/cliquet/listeners/__init__.py +++ b/cliquet/listeners/__init__.py @@ -1,10 +1,24 @@ +from pyramid.threadlocal import get_current_registry + class ListenerBase(object): - def __init__(self, *args, **kwargs): + def _done(self, name, res_id, success, result): pass - def __call__(self, event): + def _async_run(self, event): + workers = get_current_registry().workers + workers.apply_async('event', self._run, (event,), self._done) + + def _run(self, event): + raise NotImplementedError() + + def __call__(self, event, async=True): """ :param event: Incoming event + :param async: Run asynchronously, default: True """ - raise NotImplementedError() + if async: + return self._async_run(event) + else: + # not used yet + return self._run(event) # pragma: no cover diff --git a/cliquet/listeners/redis.py b/cliquet/listeners/redis.py index 402f2cd3..0f8eced1 100644 --- a/cliquet/listeners/redis.py +++ b/cliquet/listeners/redis.py @@ -19,7 +19,7 @@ def __init__(self, client, listname, *args, **kwargs): self._client = client self.listname = listname - def __call__(self, event): + def _run(self, event): # pragma: no cover try: payload = json.dumps(event.payload) except TypeError: diff --git a/cliquet/tests/support.py b/cliquet/tests/support.py index 3075e84e..bc7ba9a9 100644 --- a/cliquet/tests/support.py +++ b/cliquet/tests/support.py @@ -84,6 +84,8 @@ def __init__(self, *args, **kwargs): self.storage = self.app.app.registry.storage self.cache = self.app.app.registry.cache self.permission = self.app.app.registry.permission + self.workers = self.app.app.registry.workers + self.headers = { 'Content-Type': 'application/json', 'Authorization': 'Basic bWF0OjE=' @@ -122,6 +124,7 @@ def tearDown(self): self.storage.flush() self.cache.flush() self.permission.flush() + self.workers.close() class ThreadMixin(object): diff --git a/cliquet/tests/test_listeners.py b/cliquet/tests/test_listeners.py index b2ae3066..9c26d217 100644 --- a/cliquet/tests/test_listeners.py +++ b/cliquet/tests/test_listeners.py @@ -3,6 +3,7 @@ import uuid from contextlib import contextmanager from datetime import datetime +import time import mock from pyramid import testing @@ -27,6 +28,7 @@ def make_app(self, extra_settings={}): settings.update(**extra_settings) config = testing.setUp(settings=settings) config.commit() + initialization.setup_workers(config) initialization.setup_listeners(config) return config @@ -151,6 +153,8 @@ def setUp(self): self.config = testing.setUp() self.config.add_settings({'events_pool_size': 1, 'events_url': 'redis://localhost:6379/0'}) + initialization.setup_workers(self.config) + self.config.commit() self._redis = create_from_config(self.config, prefix='events_') self._size = 0 @@ -163,6 +167,9 @@ def has_redis_changed(self): def notify(self, event): self._save_redis() self.config.registry.notify(event) + while self.config.registry.workers.in_progress('events'): + time.sleep(.1) + time.sleep(.1) @contextmanager def redis_listening(self): @@ -214,4 +221,4 @@ class ListenerBaseTest(unittest.TestCase): def test_not_implemented(self): # make sure we can't use the base listener listener = ListenerBase() - self.assertRaises(NotImplementedError, listener, object()) + self.assertRaises(NotImplementedError, listener._run, object()) diff --git a/cliquet/tests/test_workers.py b/cliquet/tests/test_workers.py new file mode 100644 index 00000000..d7033445 --- /dev/null +++ b/cliquet/tests/test_workers.py @@ -0,0 +1,68 @@ +import unittest +import time +from cliquet.workers import MemoryWorkers + + +def boom(): + raise Exception('ok') + + +class TestMemoryWorkers(unittest.TestCase): + def setUp(self): + self.workers = MemoryWorkers(size=1) + + def tearDown(self): + self.workers.close() + + def test_async(self): + workers = self.workers + workers.apply_async('some-sleep', time.sleep, (.2,)) + pids = workers.in_progress('some-sleep') + self.assertEqual(len(pids), 1) + time.sleep(.3) + self.assertEqual(workers.in_progress('some-sleep'), []) + + def test_async_fails(self): + workers = self.workers + res_id = workers.apply_async('exc', boom) + time.sleep(.2) + res = workers.get_result(res_id) + self.assertFalse(res[0]) + self.assertTrue('Traceback' in res[1]) + + def test_initialize(self): + workers = self.workers + workers.initialize(2) + self.assertEqual(workers.size, 2) + + def test_keyboardinterrupt(self): + def _break(*args, **kw): + raise KeyboardInterrupt() + + self.workers._pool.apply_async = _break + self.workers.apply_async('ok', object()) + self.assertTrue(self.workers.closed) + + def test_result_size(self): + # make sure the results don't grow indefinitely + def noop(num): + return num + + res_ids = [self.workers.apply_async('noop', noop, (i,)) + for i in range(101)] + + while self.workers.in_progress('noop'): + time.sleep(.1) + time.sleep(.1) + + # one should be gone + counter = error = 0 + for res_id in res_ids: + try: + self.workers.get_result(res_id) + counter += 1 + except KeyError: + error += 1 + + self.assertEqual(counter, 100) + self.assertEqual(error, 1) diff --git a/cliquet/workers.py b/cliquet/workers.py new file mode 100644 index 00000000..0f191620 --- /dev/null +++ b/cliquet/workers.py @@ -0,0 +1,125 @@ +import os +import signal +import traceback +from uuid import uuid4 +from multiprocessing import Pool +from functools import partial +from collections import defaultdict, OrderedDict + +try: # pragma: no cover + # until dill works with pypy, let's use plain Pickle. + # https://github.com/uqfoundation/dill/issues/73 + + # we have, however, to implememt the pickling on locks + # because of the Redis listener. + # + # The code below was inspired from dill + import __pypy__ # NOQA + from pickle import loads, dumps, Pickler, UnpicklingError + from thread import LockType + + def _create_lock(locked, *args): + from threading import Lock + lock = Lock() + if locked: + if not lock.acquire(False): + raise UnpicklingError("Cannot acquire lock") + return lock + + def _save_lock(pickler, obj): + pickler.save_reduce(_create_lock, (obj.locked(),), obj=obj) + + Pickler.dispatch[LockType] = _save_lock +except ImportError: # pragma: no cover + from dill import loads, dumps + + +def _run(dumped): # pragma: no cover + func, args = loads(dumped) + try: + result = func(*args) + except Exception: + return False, traceback.format_exc() + return True, result + + +class MemoryWorkers(object): + def __init__(self, size=1, result_size_limit=100): + self.closed = True + self.initialize(size, result_size_limit) + + def initialize(self, size=1, result_size_limit=100): + if not self.closed: + self.close() + self.result_size_limit = 100 + self.size = size + self._results = OrderedDict() + self._in_progress = defaultdict(list) + handler = signal.signal(signal.SIGINT, signal.SIG_IGN) + try: + self._pool = Pool(self.size, initializer=self._init_proc, + initargs=(os.environ,)) + finally: + signal.signal(signal.SIGINT, handler) + self.closed = False + + def get_result(self, res_id): + return self._results[res_id] + + def _store_result(self, name, res_id, res, callback=None): + while len(self._results) >= self.result_size_limit: + self._results.popitem() + + self._in_progress[name].remove(res_id) + success, result = res + self._results[res_id] = res + if not success: + from cliquet import logger + logger.error(result) + + if callback is not None: # pragma: no cover + callback(name, res_id, success, result) + + def _init_proc(self, environ): # pragma: no cover + os.environ.update(environ) + signal.signal(signal.SIGINT, signal.SIG_IGN) + + def in_progress(self, name): + return self._in_progress[name] + + def close(self): + self._pool.close() + self._pool.join() + self.closed = True + + def apply_async(self, name, func, args=None, callback=None): + if args is None: + args = tuple() + res_id = str(uuid4()) + self._in_progress[name].append(res_id) + async_callback = partial(self._store_result, name, res_id, + callback=callback) + cmd = partial(_run, dumps((func, args))) + try: + self._pool.apply_async(cmd, callback=async_callback) + except KeyboardInterrupt: + self._pool.terminate() + self._pool.join() + self.closed = True + + return res_id + + +_WORKERS_PER_PROCESS = {} + + +def get_memory_workers(size=1): + pid = os.getpid() + if pid in _WORKERS_PER_PROCESS: + workers = _WORKERS_PER_PROCESS[pid] + if workers.closed: + workers.initialize(size) + else: + _WORKERS_PER_PROCESS[pid] = workers = MemoryWorkers(size) + + return workers diff --git a/cliquet_docs/reference/configuration.rst b/cliquet_docs/reference/configuration.rst index 47bd26cf..f11bf3d3 100644 --- a/cliquet_docs/reference/configuration.rst +++ b/cliquet_docs/reference/configuration.rst @@ -103,6 +103,30 @@ permission and the storage backend. an empty string to disable it. +Background Processes +==================== + +Cliquet uses a pool of processes to run some background processes to avoif +blocking the incoming requests when the task to perform is not impacting +the response, like asynchronous notifications. + +By default, there's a single process worker running in the background, +but you can change the value with **background.processes**. + + +.. code-block:: ini + + cliquet.background.processes = 4 + + +One process is enough if the only task being done in the background is +notifying redis. But if you start to do longer work in your notifications, +that are triggered by every incoming requests, you should raise the number +of workers. There's no magic formula and you should tweak the value +to find the best compromise. + + + Deployment ========== diff --git a/requirements.txt b/requirements.txt index 582024e9..888d1777 100644 --- a/requirements.txt +++ b/requirements.txt @@ -29,3 +29,4 @@ zope.deprecation==4.1.2 zope.interface==4.1.3 zope.sqlalchemy==0.7.6 enum34==1.1.2 +dill==0.2.5 diff --git a/setup.py b/setup.py index 8b41eade..09dfffd5 100644 --- a/setup.py +++ b/setup.py @@ -28,6 +28,7 @@ 'six', 'structlog', 'enum34', + 'dill' ] if installed_with_pypy: