diff --git a/hirefire/contrib/django/middleware.py b/hirefire/contrib/django/middleware.py index d0d5e390..750c2f3b 100644 --- a/hirefire/contrib/django/middleware.py +++ b/hirefire/contrib/django/middleware.py @@ -1,10 +1,11 @@ from __future__ import absolute_import + import os import re from django.conf import settings from django.core.exceptions import ImproperlyConfigured -from django.http import HttpResponse +from django.http import HttpResponse, JsonResponse try: # Django >= 1.10 @@ -14,8 +15,9 @@ # https://docs.djangoproject.com/en/1.10/topics/http/middleware/#upgrading-pre-django-1-10-style-middleware MiddlewareMixin = object - -from hirefire.procs import load_procs, dump_procs, HIREFIRE_FOUND +from hirefire.procs import ( + load_procs, serialize_procs, ProcSerializer, HIREFIRE_FOUND +) def setting(name, default=None): @@ -24,6 +26,7 @@ def setting(name, default=None): TOKEN = setting('HIREFIRE_TOKEN', 'development') PROCS = setting('HIREFIRE_PROCS', []) +USE_CONCURRENCY = setting('HIREFIRE_USE_CONCURRENCY', False) if not PROCS: raise ImproperlyConfigured('The HireFire Django middleware ' @@ -31,6 +34,23 @@ def setting(name, default=None): 'in the HIREFIRE_PROCS setting.') +class DjangoProcSerializer(ProcSerializer): + """ + Like :class:`ProcSerializer` but ensures close database connections. + + New threads in Django will open a new connection automatically once + ``django.db`` is imported but they do not close the connection if a + thread is terminated. + """ + + def __call__(self, args): + try: + return super(DjangoProcSerializer, self).__call__(args) + finally: + from django.db import close_old_connections + close_old_connections() + + class HireFireMiddleware(MiddlewareMixin): """ The Django middleware that is hardwired to the URL paths @@ -49,11 +69,14 @@ def test(self, request): def info(self, request): """ - The heart of the app, returning a JSON ecoded list - of proc results. + Return JSON response serializing all proc names and quantities. """ - payload = dump_procs(self.loaded_procs) - return HttpResponse(payload, content_type='application/json') + data = serialize_procs( + self.loaded_procs, + use_concurrency=USE_CONCURRENCY, + serializer_class=DjangoProcSerializer, + ) + return JsonResponse(data=data, safe=False) def process_request(self, request): path = request.path diff --git a/hirefire/procs/__init__.py b/hirefire/procs/__init__.py index 7fa15f93..21044d81 100644 --- a/hirefire/procs/__init__.py +++ b/hirefire/procs/__init__.py @@ -1,20 +1,25 @@ import json +import os from collections import OrderedDict - -from ..utils import import_attribute, TimeAwareJSONEncoder +from concurrent.futures import ThreadPoolExecutor import six +from ..utils import import_attribute, TimeAwareJSONEncoder -__all__ = ('loaded_procs', 'Proc', 'load_proc', 'load_procs', 'dump_procs') - +__all__ = ( + 'loaded_procs', 'Proc', 'load_proc', 'load_procs', 'dump_procs', + 'serialize_procs', 'ProcSerializer', +) HIREFIRE_FOUND = 'HireFire Middleware Found!' +USE_CONCURRENCY = os.environ.get('HIREFIRE_USE_CONCURRENCY', False) class Procs(OrderedDict): pass + loaded_procs = Procs() @@ -59,25 +64,46 @@ def load_procs(*procs): return loaded_procs -def native_dump_procs(procs): +class ProcSerializer: """ - Given a list of loaded procs, dump the data for them into - a list of dictionaries in the form expected by HireFire, - ready to be encoded into JSON. + Callable that transforms procs to dictionaries. + + Maintains an instance cache that will be reused across calls. """ - data = [] - cache = {} - for name, proc in procs.items(): + def __init__(self): + self.cache = {} + + def __call__(self, args): + name, proc = args try: - quantity = proc.quantity(cache=cache) + quantity = proc.quantity(cache=self.cache) except TypeError: quantity = proc.quantity() - - data.append({ + return { 'name': name, 'quantity': quantity or 0, - }) - return data + } + + +def serialize_procs(procs, use_concurrency=USE_CONCURRENCY, + serializer_class=ProcSerializer): + """ + Given a list of loaded procs, serialize the data for them into + a list of dictionaries in the form expected by HireFire, + ready to be encoded into JSON. + """ + serializer = serializer_class() + + if use_concurrency: + with ThreadPoolExecutor() as executor: + # Execute all procs in parallel to avoid blocking IO + # especially celery which needs to open a transport to AMQP. + proc_iterator = executor.map(serializer, procs.items()) + else: + proc_iterator = map(serializer, procs.items()) + + # Return a list, since json.dumps does not support generators. + return list(proc_iterator) def dump_procs(procs): @@ -85,7 +111,7 @@ def dump_procs(procs): Given a list of loaded procs dumps the data for them in JSON format. """ - data = native_dump_procs(procs) + data = serialize_procs(procs) return json.dumps(data, cls=TimeAwareJSONEncoder, ensure_ascii=False) @@ -178,6 +204,7 @@ def quantity(self): class for an example. """ + def __init__(self, *args, **kwargs): super(ClientProc, self).__init__(*args, **kwargs) self.clients = [] diff --git a/setup.py b/setup.py index 1ca9d545..a6f4d7c4 100644 --- a/setup.py +++ b/setup.py @@ -37,7 +37,10 @@ def find_version(*parts): author='Jannis Leidel', author_email='jannis@leidel.info', packages=find_packages(), - install_requires=['six'], + install_requires=[ + 'six', + 'futures; python_version == "2.7"', + ], classifiers=[ 'Development Status :: 4 - Beta', 'Environment :: Web Environment',