Skip to content
This repository has been archived by the owner on Mar 28, 2019. It is now read-only.

Run listeners using subprocess running in background #647

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cliquet/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

DEFAULT_SETTINGS = {
'backoff': None,
'background.workers': 'cliquet.workers.memory',
'background.processes': 1,
'batch_max_requests': 25,
'cache_backend': '',
'cache_url': '',
Expand Down Expand Up @@ -55,6 +57,7 @@
'cliquet.initialization.setup_authentication',
'cliquet.initialization.setup_backoff',
'cliquet.initialization.setup_statsd',
'cliquet.initialization.setup_workers',
'cliquet.initialization.setup_listeners',
'cliquet.events.setup_transaction_hook',
),
Expand Down
59 changes: 47 additions & 12 deletions cliquet/initialization.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import warnings
from datetime import datetime
from dateutil import parser as dateparser
from functools import partial

import requests
import structlog
Expand All @@ -18,13 +19,15 @@

import cliquet
from cliquet import errors
from cliquet import events
from cliquet import utils
from cliquet import statsd
from cliquet import cache
from cliquet import storage
from cliquet import permission
from cliquet import workers
from cliquet.logs import logger
from cliquet.events import ResourceRead, ResourceChanged, ACTIONS
from cliquet.listeners import async_listener

from pyramid.events import NewRequest, NewResponse
from pyramid.exceptions import ConfigurationError
Expand Down Expand Up @@ -361,7 +364,7 @@ def on_new_response(event):

class EventActionFilter(object):
def __init__(self, actions, config):
actions = ACTIONS.from_string_list(actions)
actions = events.ACTIONS.from_string_list(actions)
self.actions = [action.value for action in actions]

def phash(self):
Expand Down Expand Up @@ -389,7 +392,9 @@ def setup_listeners(config):
config.add_subscriber_predicate('for_actions', EventActionFilter)
config.add_subscriber_predicate('for_resources', EventResourceFilter)

write_actions = (ACTIONS.CREATE, ACTIONS.UPDATE, ACTIONS.DELETE)
write_actions = (events.ACTIONS.CREATE,
events.ACTIONS.UPDATE,
events.ACTIONS.DELETE)
settings = config.get_settings()
listeners = aslist(settings['event_listeners'])

Expand All @@ -405,27 +410,57 @@ def setup_listeners(config):
listener_mod = config.maybe_dotted(settings[prefix + 'use'])
listener = listener_mod.load_from_config(config, prefix)

# If StatsD is enabled, monitor execution time of listeners.
if getattr(config.registry, "statsd", None):
statsd_client = config.registry.statsd
key = 'listeners.%s' % name
listener = statsd_client.timer(key)(listener.__call__)
listener_call = listener.__call__

is_async = asbool(settings.get(prefix + 'async', 'false'))
if is_async and hasattr(config.registry, 'workers'):
# Wrap the listener callback to use background workers.
listener_call = partial(async_listener,
config.registry.workers,
listener_call,
listener.done)
else:
# If StatsD is enabled, monitor execution time of listeners.
if hasattr(config.registry, 'statsd'):
statsd_client = config.registry.statsd
key = 'listeners.%s' % name
listener_call = statsd_client.timer(key)(listener_call)

# Default actions are write actions only.
actions = aslist(settings.get(prefix + 'actions', ''))
if len(actions) > 0:
actions = ACTIONS.from_string_list(actions)
actions = events.ACTIONS.from_string_list(actions)
else:
actions = write_actions

# By default, it listens to every resources.
resource_names = aslist(settings.get(prefix + 'resources', ''))
options = dict(for_actions=actions, for_resources=resource_names)

if ACTIONS.READ in actions:
config.add_subscriber(listener, ResourceRead, **options)
# If read action is specified, subscribe to read event.
if events.ACTIONS.READ in actions:
event_cls = events.ResourceRead
config.add_subscriber(listener_call, event_cls, **options)
if len(actions) == 1:
return

config.add_subscriber(listener, ResourceChanged, **options)
# If write action is specified, subscribe to changed event.
event_cls = events.ResourceChanged
config.add_subscriber(listener_call, event_cls, **options)


def setup_workers(config):
settings = config.get_settings()

workers_mod = settings['background.workers']
workers_mod = config.maybe_dotted(workers_mod)
backend = workers_mod.load_from_config(config)
if not isinstance(backend, workers.WorkersBase):
raise ConfigurationError("Invalid workers backend: %s" % backend)
config.registry.workers = backend

heartbeat = workers.heartbeat(backend)
config.registry.heartbeats['workers'] = heartbeat


def load_default_settings(config, default_settings):
Expand Down
22 changes: 22 additions & 0 deletions cliquet/listeners/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import copy


class ListenerBase(object):
def __init__(self, *args, **kwargs):
Expand All @@ -8,3 +10,23 @@ def __call__(self, event):
:param event: Incoming event
"""
raise NotImplementedError()

def done(self, name, res_id, success, result):
from cliquet import logger
logger.info("Async listener done.",
name=name,
result_id=res_id,
success=success,
result=result)


def async_listener(workers, listener, callback, event):
"""
Execute the specified `listener` on the background workers.
"""
# With asynchronous listeners, the `event` is serialized (pickled).
# Since :class:`pyramid.utils.Request` is not pickable, we set it
# to None.
event = copy.copy(event)
event.request = None
workers.apply_async('event', listener, (event,), callback)
4 changes: 4 additions & 0 deletions cliquet/tests/support.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def route_url(*a, **kw):
return ''.join([p for p in parts if p])

self.route_url = route_url
self.current_resource_name = None

follow_subrequest = follow_subrequest

Expand Down Expand Up @@ -84,6 +85,8 @@ def __init__(self, *args, **kwargs):
self.storage = self.app.app.registry.storage
self.cache = self.app.app.registry.cache
self.permission = self.app.app.registry.permission
self.workers = self.app.app.registry.workers

self.headers = {
'Content-Type': 'application/json',
'Authorization': 'Basic bWF0OjE='
Expand Down Expand Up @@ -122,6 +125,7 @@ def tearDown(self):
self.storage.flush()
self.cache.flush()
self.permission.flush()
self.workers.close()


class ThreadMixin(object):
Expand Down
22 changes: 22 additions & 0 deletions cliquet/tests/test_initialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,3 +465,25 @@ def test_plugin_benefits_from_cors_setup(self):
}
resp = app.options('/v0/attachment', headers=headers, status=200)
self.assertIn('Access-Control-Allow-Origin', resp.headers)


class WorkersSetupTest(unittest.TestCase):

def test_wrong_class(self):
settings = {
'background.workers': 'cliquet.storage.redis',
'storage_url': '',
'storage_pool_size': 1
}
config = Configurator(settings=settings)
with self.assertRaises(ConfigurationError):
initialization.setup_workers(config)

def test_registers_heartbeat(self):
settings = {
'background.workers': 'cliquet.workers.memory',
}
config = Configurator(settings=settings)
config.registry.heartbeats = {}
initialization.setup_workers(config)
self.assertIsNotNone(config.registry.heartbeats.get('workers'))
Loading