Skip to content

Commit

Permalink
gunicorn worker for aiohttp.web
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Dec 1, 2014
1 parent 558eb8f commit 66c7033
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 215 deletions.
13 changes: 13 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
CHANGES
=======

Unreleased
-------------------

- Gunicorn worker for aiohttp.web

- Removed deprecated AsyncGunicornWorker


0.11.0 (11-29-2014)
-------------------

- Support named routes in `aiohttp.web.UrlDispatcher` #179

- Make websocket subprotocols conform to spec #181


0.10.2 (11-19-2014)
-------------------

- Don't unquote `environ['PATH_INFO']` in wsgi.py #177


0.10.1 (11-17-2014)
-------------------

Expand All @@ -21,6 +31,7 @@ CHANGES

- Fix multidict `__iter__`, the method should iterate over keys, not (key, value) pairs.


0.10.0 (11-13-2014)
-------------------

Expand All @@ -38,11 +49,13 @@ CHANGES

- Set server.transport to None on .closing() #172


0.9.3 (10-30-2014)
------------------

- Fix compatibility with asyncio 3.4.1+ #170


0.9.2 (10-16-2014)
------------------

Expand Down
1 change: 1 addition & 0 deletions aiohttp/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
client_log = logging.getLogger('aiohttp.client')
internal_log = logging.getLogger('aiohttp.internal')
server_log = logging.getLogger('aiohttp.server')
web_log = logging.getLogger('aiohttp.web')
websocket_log = logging.getLogger('aiohttp.websocket')
18 changes: 14 additions & 4 deletions aiohttp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,18 @@ class ServerHttpProtocol(aiohttp.StreamProtocol):

_request_parser = aiohttp.HttpRequestParser() # default request parser

def __init__(self, *, loop=None, keep_alive=None,
timeout=15, tcp_keepalive=True, allowed_methods=(),
debug=False, log=server_log, access_log=access_log,
access_log_format=ACCESS_LOG_FORMAT, **kwargs):
def __init__(self, *, loop=None,
keep_alive=None,
timeout=15,
tcp_keepalive=True,
allowed_methods=(),
log=server_log,
access_log=access_log,
access_log_format=ACCESS_LOG_FORMAT,
host="",
port=0,
debug=False,
**kwargs):
super().__init__(loop=loop, **kwargs)

self._keep_alive_period = keep_alive # number of seconds to keep alive
Expand All @@ -84,6 +92,8 @@ def __init__(self, *, loop=None, keep_alive=None,
self._request_prefix = aiohttp.HttpPrefixParser(allowed_methods)
self._loop = loop if loop is not None else asyncio.get_event_loop()

self.host = host
self.port = port
self.log = log
self.debug = debug
self.access_log = access_log
Expand Down
56 changes: 52 additions & 4 deletions aiohttp/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from urllib.parse import urlsplit, parse_qsl, unquote, urlencode

from .abc import AbstractRouter, AbstractMatchInfo
from .log import web_log
from .multidict import (CaseInsensitiveMultiDict,
CaseInsensitiveMutableMultiDict,
MultiDict,
Expand Down Expand Up @@ -1100,6 +1101,16 @@ def __init__(self, app, **kwargs):
super().__init__(**kwargs)
self._app = app

def connection_made(self, transport):
super().connection_made(transport)

self._app.connection_made(self, transport)

def connection_lost(self, exc):
self._app.connection_lost(self, exc)

super().connection_lost(exc)

@asyncio.coroutine
def handle_request(self, message, payload):
request = Request(self._app, message, payload,
Expand Down Expand Up @@ -1136,13 +1147,13 @@ def __init__(self, *, loop=None, router=None, **kwargs):
# TODO: explicitly accept *debug* param
if loop is None:
loop = asyncio.get_event_loop()
self._kwargs = kwargs
if router is None:
router = UrlDispatcher()
assert isinstance(router, AbstractRouter), router
self._router = router
self._loop = loop
self._finish_callbacks = []
self._connections = {}

@property
def router(self):
Expand All @@ -1152,12 +1163,26 @@ def router(self):
def loop(self):
return self._loop

def make_handler(self):
return RequestHandler(self, loop=self._loop, **self._kwargs)
def make_handler(self, **kwargs):
return RequestHandler(self, loop=self._loop, **kwargs)

@property
def connections(self):
return list(self._connections.keys())

def connection_made(self, handler, transport):
self._connections[handler] = transport

def connection_lost(self, handler, exc=None):
if handler in self._connections:
del self._connections[handler]

@asyncio.coroutine
def finish(self):
for (cb, args, kwargs) in self._finish_callbacks:
callbacks = self._finish_callbacks
self._finish_callbacks = []

for (cb, args, kwargs) in callbacks:
try:
res = cb(*args, **kwargs)
if (asyncio.iscoroutine(res) or
Expand All @@ -1170,5 +1195,28 @@ def finish(self):
'application': self,
})

@asyncio.coroutine
def finish_connections(self, timeout=None):
for handler in self._connections.keys():
handler.closing()

def cleanup():
while self._connections:
yield from asyncio.sleep(0.5, loop=self._loop)

if timeout:
try:
yield from asyncio.wait_for(
cleanup(), timeout, loop=self._loop)
except asyncio.TimeoutError:
web_log.warn(
"Not all connections closed (pending: %d)",
len(self._connections))

for transport in self._connections.values():
transport.close()

self._connections.clear()

def register_on_finish(self, func, *args, **kwargs):
self._finish_callbacks.insert(0, (func, args, kwargs))
119 changes: 29 additions & 90 deletions aiohttp/worker.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,18 @@
"""Async gunicorn worker."""
__all__ = ['AsyncGunicornWorker', 'PortMapperWorker']
"""Async gunicorn worker for auihttp.wen.Application."""
__all__ = ['GunicornWebWorker']

import asyncio
import functools
import os
import gunicorn.workers.base as base
import warnings

from aiohttp.wsgi import WSGIServerHttpProtocol


class AsyncGunicornWorker(base.Worker):
class GunicornWebWorker(base.Worker):

This comment has been minimized.

Copy link
@asvetlov

asvetlov Dec 5, 2014

Member

Why do you need gunicorn worker at all?
Doesn't gunicorn.workers.gaiohttp satisfy you?


def __init__(self, *args, **kw): # pragma: no cover
warnings.warn("AsyncGunicornWorker is deprecated "
"starting from 0.11 release, "
"use standard gaiohttp worker.", DeprecationWarning)
super().__init__(*args, **kw)

self.servers = []
self.connections = {}

def init_process(self):
# create new event_loop after fork
Expand All @@ -37,33 +31,36 @@ def run(self):
finally:
self.loop.close()

def wrap_protocol(self, proto):
proto.connection_made = _wrp(
proto, proto.connection_made, self.connections)
proto.connection_lost = _wrp(
proto, proto.connection_lost, self.connections, False)
return proto

def factory(self, wsgi, host, port):
proto = WSGIServerHttpProtocol(
wsgi, loop=self.loop,
def factory(self, app, host, port):
return app.make_handler(
host=host,
port=port,
log=self.log,
debug=self.cfg.debug,
keep_alive=self.cfg.keepalive,
access_log=self.log.access_log,
access_log_format=self.cfg.access_log_format)
return self.wrap_protocol(proto)

def get_factory(self, sock, host, port):
return functools.partial(self.factory, self.wsgi, host, port)

@asyncio.coroutine
def close(self):
try:
if hasattr(self.wsgi, 'close'):
yield from self.wsgi.close()
except:
self.log.exception('Process shutdown exception')
if self.servers:
self.log.info("Stopping server: %s, connections: %s",
self.pid, len(self.wsgi.connections))

# stop accepting connections
for server in self.servers:
server.close()
self.servers.clear()

# stop alive connections
yield from self.wsgi.finish_connections(
timeout=self.cfg.graceful_timeout / 100 * 80)

# stop application
yield from self.wsgi.finish()

@asyncio.coroutine
def _run(self):
Expand All @@ -75,73 +72,15 @@ def _run(self):
# If our parent changed then we shut down.
pid = os.getpid()
try:
while self.alive or self.connections:
while self.alive:
self.notify()

if (self.alive and
pid == os.getpid() and self.ppid != os.getppid()):
self.log.info("Parent changed, shutting down: %s", self)
if pid == os.getpid() and self.ppid != os.getppid():
self.alive = False

# stop accepting requests
if not self.alive:
if self.servers:
self.log.info(
"Stopping server: %s, connections: %s",
pid, len(self.connections))
for server in self.servers:
server.close()
self.servers.clear()

# prepare connections for closing
for conn in self.connections.values():
if hasattr(conn, 'closing'):
conn.closing()

yield from asyncio.sleep(1.0, loop=self.loop)
except KeyboardInterrupt:
self.log.info("Parent changed, shutting down: %s", self)
else:
yield from asyncio.sleep(1.0, loop=self.loop)
except (Exception, BaseException, GeneratorExit, KeyboardInterrupt):
pass

if self.servers:
for server in self.servers:
server.close()

yield from self.close()


class PortMapperWorker(AsyncGunicornWorker):
"""Special worker that uses different wsgi application depends on port.
Main wsgi application object has to be dictionary:
"""

def get_factory(self, sock, host, port):
return functools.partial(self.factory, self.wsgi[port], host, port)

@asyncio.coroutine
def close(self):
for port, wsgi in self.wsgi.items():
try:
if hasattr(wsgi, 'close'):
yield from wsgi.close()
except:
self.log.exception('Process shutdown exception')


class _wrp:

def __init__(self, proto, meth, tracking, add=True):
self._proto = proto
self._id = id(proto)
self._meth = meth
self._tracking = tracking
self._add = add

def __call__(self, *args):
if self._add:
self._tracking[self._id] = self._proto
elif self._id in self._tracking:
del self._tracking[self._id]

conn = self._meth(*args)
return conn
2 changes: 1 addition & 1 deletion examples/web_srv.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def change_body(request):
@asyncio.coroutine
def hello(request):
resp = StreamResponse(request)
name = request.match_info.get('name', 'Anonimous')
name = request.match_info.get('name', 'Anonymous')
answer = ('Hello, ' + name).encode('utf8')
resp.content_length = len(answer)
resp.send_headers()
Expand Down
Loading

1 comment on commit 66c7033

@fafhrd91
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gaiohttp only works with WSGIServerProtocol

Please sign in to comment.