Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use concurrent futures to reduce blocking IO #37

Merged
merged 7 commits into from
Aug 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions hirefire/contrib/django/middleware.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from __future__ import absolute_import

import os
import re

from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.http import HttpResponse
from django.http import HttpResponse, JsonResponse

try:
# Django >= 1.10
Expand All @@ -14,8 +15,9 @@
# https://docs.djangoproject.com/en/1.10/topics/http/middleware/#upgrading-pre-django-1-10-style-middleware
MiddlewareMixin = object


from hirefire.procs import load_procs, dump_procs, HIREFIRE_FOUND
from hirefire.procs import (
load_procs, serialize_procs, ProcSerializer, HIREFIRE_FOUND
)


def setting(name, default=None):
Expand All @@ -24,13 +26,31 @@ def setting(name, default=None):

TOKEN = setting('HIREFIRE_TOKEN', 'development')
PROCS = setting('HIREFIRE_PROCS', [])
USE_CONCURRENCY = setting('HIREFIRE_USE_CONCURRENCY', False)

if not PROCS:
raise ImproperlyConfigured('The HireFire Django middleware '
'requires at least one proc defined '
'in the HIREFIRE_PROCS setting.')


class DjangoProcSerializer(ProcSerializer):
"""
Like :class:`ProcSerializer` but ensures close database connections.

New threads in Django will open a new connection automatically once
``django.db`` is imported but they do not close the connection if a
thread is terminated.
"""

def __call__(self, args):
try:
return super(DjangoProcSerializer, self).__call__(args)
finally:
from django.db import close_old_connections
close_old_connections()


class HireFireMiddleware(MiddlewareMixin):
"""
The Django middleware that is hardwired to the URL paths
Expand All @@ -49,11 +69,14 @@ def test(self, request):

def info(self, request):
"""
The heart of the app, returning a JSON ecoded list
of proc results.
Return JSON response serializing all proc names and quantities.
"""
payload = dump_procs(self.loaded_procs)
return HttpResponse(payload, content_type='application/json')
data = serialize_procs(
self.loaded_procs,
use_concurrency=USE_CONCURRENCY,
serializer_class=DjangoProcSerializer,
)
return JsonResponse(data=data, safe=False)

def process_request(self, request):
path = request.path
Expand Down
61 changes: 44 additions & 17 deletions hirefire/procs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
import json
import os
from collections import OrderedDict

from ..utils import import_attribute, TimeAwareJSONEncoder
from concurrent.futures import ThreadPoolExecutor

import six

from ..utils import import_attribute, TimeAwareJSONEncoder

__all__ = ('loaded_procs', 'Proc', 'load_proc', 'load_procs', 'dump_procs')

__all__ = (
'loaded_procs', 'Proc', 'load_proc', 'load_procs', 'dump_procs',
'serialize_procs', 'ProcSerializer',
)

HIREFIRE_FOUND = 'HireFire Middleware Found!'
USE_CONCURRENCY = os.environ.get('HIREFIRE_USE_CONCURRENCY', False)


class Procs(OrderedDict):
pass


loaded_procs = Procs()


Expand Down Expand Up @@ -59,33 +64,54 @@ def load_procs(*procs):
return loaded_procs


def native_dump_procs(procs):
class ProcSerializer:
"""
Given a list of loaded procs, dump the data for them into
a list of dictionaries in the form expected by HireFire,
ready to be encoded into JSON.
Callable that transforms procs to dictionaries.

Maintains an instance cache that will be reused across calls.
"""
data = []
cache = {}
for name, proc in procs.items():
def __init__(self):
self.cache = {}

def __call__(self, args):
name, proc = args
try:
quantity = proc.quantity(cache=cache)
quantity = proc.quantity(cache=self.cache)
except TypeError:
quantity = proc.quantity()

data.append({
return {
'name': name,
'quantity': quantity or 0,
})
return data
}


def serialize_procs(procs, use_concurrency=USE_CONCURRENCY,
serializer_class=ProcSerializer):
"""
Given a list of loaded procs, serialize the data for them into
a list of dictionaries in the form expected by HireFire,
ready to be encoded into JSON.
"""
serializer = serializer_class()

if use_concurrency:
with ThreadPoolExecutor() as executor:
# Execute all procs in parallel to avoid blocking IO
# especially celery which needs to open a transport to AMQP.
proc_iterator = executor.map(serializer, procs.items())
else:
proc_iterator = map(serializer, procs.items())

# Return a list, since json.dumps does not support generators.
return list(proc_iterator)


def dump_procs(procs):
"""
Given a list of loaded procs dumps the data for them in
JSON format.
"""
data = native_dump_procs(procs)
data = serialize_procs(procs)
return json.dumps(data, cls=TimeAwareJSONEncoder, ensure_ascii=False)


Expand Down Expand Up @@ -178,6 +204,7 @@ def quantity(self):
class for an example.

"""

def __init__(self, *args, **kwargs):
super(ClientProc, self).__init__(*args, **kwargs)
self.clients = []
Expand Down
5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ def find_version(*parts):
author='Jannis Leidel',
author_email='jannis@leidel.info',
packages=find_packages(),
install_requires=['six'],
install_requires=[
'six',
'futures; python_version == "2.7"',
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know about this syntax. This is cool, and helped me find this article: https://hynek.me/articles/conditional-python-dependencies/

],
classifiers=[
'Development Status :: 4 - Beta',
'Environment :: Web Environment',
Expand Down