Skip to content
This repository has been archived by the owner on Mar 28, 2019. It is now read-only.

Commit

Permalink
Notifications are now asynchronous
Browse files Browse the repository at this point in the history
  • Loading branch information
tarekziade authored and leplatrem committed Feb 15, 2016
1 parent c011f42 commit 5c8fa56
Show file tree
Hide file tree
Showing 11 changed files with 257 additions and 6 deletions.
3 changes: 2 additions & 1 deletion cliquet/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@
'cliquet.initialization.setup_authentication',
'cliquet.initialization.setup_backoff',
'cliquet.initialization.setup_statsd',
'cliquet.initialization.setup_listeners'
'cliquet.initialization.setup_listeners',
'cliquet.initialization.setup_workers'
),
'event_listeners': '',
'logging_renderer': 'cliquet.logs.ClassicLogRenderer',
Expand Down
7 changes: 7 additions & 0 deletions cliquet/initialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from cliquet import permission
from cliquet.logs import logger
from cliquet.events import ResourceRead, ResourceChanged, ACTIONS
from cliquet.workers import get_memory_workers

from pyramid.events import NewRequest, NewResponse
from pyramid.exceptions import ConfigurationError
Expand Down Expand Up @@ -463,6 +464,12 @@ def setup_listeners(config):
config.add_subscriber(listener, ResourceChanged, **options)


def setup_workers(config):
settings = config.get_settings()
num_workers = int(settings.get('background.processes', 1))
config.registry.workers = get_memory_workers(num_workers)


def load_default_settings(config, default_settings):
"""Read settings provided in Paste ini file, set default values and
replace if defined as environment variable.
Expand Down
20 changes: 17 additions & 3 deletions cliquet/listeners/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
from pyramid.threadlocal import get_current_registry


class ListenerBase(object):
def __init__(self, *args, **kwargs):
def _done(self, name, res_id, success, result):
pass

def __call__(self, event):
def _async_run(self, event):
workers = get_current_registry().workers
workers.apply_async('event', self._run, (event,), self._done)

def _run(self, event):
raise NotImplementedError()

def __call__(self, event, async=True):
"""
:param event: Incoming event
:param async: Run asynchronously, default: True
"""
raise NotImplementedError()
if async:
return self._async_run(event)
else:
# not used yet
return self._run(event) # pragma: no cover
2 changes: 1 addition & 1 deletion cliquet/listeners/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __init__(self, client, listname, *args, **kwargs):
self._client = client
self.listname = listname

def __call__(self, event):
def _run(self, event): # pragma: no cover
try:
payload = json.dumps(event.payload)
except TypeError:
Expand Down
3 changes: 3 additions & 0 deletions cliquet/tests/support.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ def __init__(self, *args, **kwargs):
self.storage = self.app.app.registry.storage
self.cache = self.app.app.registry.cache
self.permission = self.app.app.registry.permission
self.workers = self.app.app.registry.workers

self.headers = {
'Content-Type': 'application/json',
'Authorization': 'Basic bWF0OjE='
Expand Down Expand Up @@ -122,6 +124,7 @@ def tearDown(self):
self.storage.flush()
self.cache.flush()
self.permission.flush()
self.workers.close()


class ThreadMixin(object):
Expand Down
9 changes: 8 additions & 1 deletion cliquet/tests/test_listeners.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import uuid
from contextlib import contextmanager
from datetime import datetime
import time

import mock
from pyramid import testing
Expand All @@ -27,6 +28,7 @@ def make_app(self, extra_settings={}):
settings.update(**extra_settings)
config = testing.setUp(settings=settings)
config.commit()
initialization.setup_workers(config)
initialization.setup_listeners(config)
return config

Expand Down Expand Up @@ -151,6 +153,8 @@ def setUp(self):
self.config = testing.setUp()
self.config.add_settings({'events_pool_size': 1,
'events_url': 'redis://localhost:6379/0'})
initialization.setup_workers(self.config)
self.config.commit()
self._redis = create_from_config(self.config, prefix='events_')
self._size = 0

Expand All @@ -163,6 +167,9 @@ def has_redis_changed(self):
def notify(self, event):
self._save_redis()
self.config.registry.notify(event)
while self.config.registry.workers.in_progress('events'):
time.sleep(.1)
time.sleep(.1)

@contextmanager
def redis_listening(self):
Expand Down Expand Up @@ -214,4 +221,4 @@ class ListenerBaseTest(unittest.TestCase):
def test_not_implemented(self):
# make sure we can't use the base listener
listener = ListenerBase()
self.assertRaises(NotImplementedError, listener, object())
self.assertRaises(NotImplementedError, listener._run, object())
68 changes: 68 additions & 0 deletions cliquet/tests/test_workers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import unittest
import time
from cliquet.workers import MemoryWorkers


def boom():
raise Exception('ok')


class TestMemoryWorkers(unittest.TestCase):
def setUp(self):
self.workers = MemoryWorkers(size=1)

def tearDown(self):
self.workers.close()

def test_async(self):
workers = self.workers
workers.apply_async('some-sleep', time.sleep, (.2,))
pids = workers.in_progress('some-sleep')
self.assertEqual(len(pids), 1)
time.sleep(.3)
self.assertEqual(workers.in_progress('some-sleep'), [])

def test_async_fails(self):
workers = self.workers
res_id = workers.apply_async('exc', boom)
time.sleep(.2)
res = workers.get_result(res_id)
self.assertFalse(res[0])
self.assertTrue('Traceback' in res[1])

def test_initialize(self):
workers = self.workers
workers.initialize(2)
self.assertEqual(workers.size, 2)

def test_keyboardinterrupt(self):
def _break(*args, **kw):
raise KeyboardInterrupt()

self.workers._pool.apply_async = _break
self.workers.apply_async('ok', object())
self.assertTrue(self.workers.closed)

def test_result_size(self):
# make sure the results don't grow indefinitely
def noop(num):
return num

res_ids = [self.workers.apply_async('noop', noop, (i,))
for i in range(101)]

while self.workers.in_progress('noop'):
time.sleep(.1)
time.sleep(.1)

# one should be gone
counter = error = 0
for res_id in res_ids:
try:
self.workers.get_result(res_id)
counter += 1
except KeyError:
error += 1

self.assertEqual(counter, 100)
self.assertEqual(error, 1)
125 changes: 125 additions & 0 deletions cliquet/workers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import os
import signal
import traceback
from uuid import uuid4
from multiprocessing import Pool
from functools import partial
from collections import defaultdict, OrderedDict

try: # pragma: no cover
# until dill works with pypy, let's use plain Pickle.
# https://github.com/uqfoundation/dill/issues/73

# we have, however, to implememt the pickling on locks
# because of the Redis listener.
#
# The code below was inspired from dill
import __pypy__ # NOQA
from pickle import loads, dumps, Pickler, UnpicklingError
from thread import LockType

def _create_lock(locked, *args):
from threading import Lock
lock = Lock()
if locked:
if not lock.acquire(False):
raise UnpicklingError("Cannot acquire lock")
return lock

def _save_lock(pickler, obj):
pickler.save_reduce(_create_lock, (obj.locked(),), obj=obj)

Pickler.dispatch[LockType] = _save_lock
except ImportError: # pragma: no cover
from dill import loads, dumps


def _run(dumped): # pragma: no cover
func, args = loads(dumped)
try:
result = func(*args)
except Exception:
return False, traceback.format_exc()
return True, result


class MemoryWorkers(object):
def __init__(self, size=1, result_size_limit=100):
self.closed = True
self.initialize(size, result_size_limit)

def initialize(self, size=1, result_size_limit=100):
if not self.closed:
self.close()
self.result_size_limit = 100
self.size = size
self._results = OrderedDict()
self._in_progress = defaultdict(list)
handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
try:
self._pool = Pool(self.size, initializer=self._init_proc,
initargs=(os.environ,))
finally:
signal.signal(signal.SIGINT, handler)
self.closed = False

def get_result(self, res_id):
return self._results[res_id]

def _store_result(self, name, res_id, res, callback=None):
while len(self._results) >= self.result_size_limit:
self._results.popitem()

self._in_progress[name].remove(res_id)
success, result = res
self._results[res_id] = res
if not success:
from cliquet import logger
logger.error(result)

if callback is not None: # pragma: no cover
callback(name, res_id, success, result)

def _init_proc(self, environ): # pragma: no cover
os.environ.update(environ)
signal.signal(signal.SIGINT, signal.SIG_IGN)

def in_progress(self, name):
return self._in_progress[name]

def close(self):
self._pool.close()
self._pool.join()
self.closed = True

def apply_async(self, name, func, args=None, callback=None):
if args is None:
args = tuple()
res_id = str(uuid4())
self._in_progress[name].append(res_id)
async_callback = partial(self._store_result, name, res_id,
callback=callback)
cmd = partial(_run, dumps((func, args)))
try:
self._pool.apply_async(cmd, callback=async_callback)
except KeyboardInterrupt:
self._pool.terminate()
self._pool.join()
self.closed = True

return res_id


_WORKERS_PER_PROCESS = {}


def get_memory_workers(size=1):
pid = os.getpid()
if pid in _WORKERS_PER_PROCESS:
workers = _WORKERS_PER_PROCESS[pid]
if workers.closed:
workers.initialize(size)
else:
_WORKERS_PER_PROCESS[pid] = workers = MemoryWorkers(size)

return workers
24 changes: 24 additions & 0 deletions cliquet_docs/reference/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,30 @@ permission and the storage backend.
an empty string to disable it.


Background Processes
====================

Cliquet uses a pool of processes to run some background processes to avoif
blocking the incoming requests when the task to perform is not impacting
the response, like asynchronous notifications.

By default, there's a single process worker running in the background,
but you can change the value with **background.processes**.


.. code-block:: ini
cliquet.background.processes = 4
One process is enough if the only task being done in the background is
notifying redis. But if you start to do longer work in your notifications,
that are triggered by every incoming requests, you should raise the number
of workers. There's no magic formula and you should tweak the value
to find the best compromise.



Deployment
==========

Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ zope.deprecation==4.1.2
zope.interface==4.1.3
zope.sqlalchemy==0.7.6
enum34==1.1.2
dill==0.2.5
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
'six',
'structlog',
'enum34',
'dill'
]

if installed_with_pypy:
Expand Down

0 comments on commit 5c8fa56

Please sign in to comment.