Skip to content

Commit

Permalink
Raise a RuntimeError if asyncio is not available.
Browse files Browse the repository at this point in the history
Fixes #830.
  • Loading branch information
berkerpeksag authored and benoitc committed Aug 16, 2014
1 parent 88804ae commit 719e61b
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 145 deletions.
128 changes: 128 additions & 0 deletions gunicorn/workers/_gaiohttp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.

import asyncio
import functools
import os
import gunicorn.workers.base as base

from aiohttp.wsgi import WSGIServerHttpProtocol


class AiohttpWorker(base.Worker):

def __init__(self, *args, **kw): # pragma: no cover
super().__init__(*args, **kw)

self.servers = []
self.connections = {}

def init_process(self):
# create new event_loop after fork
asyncio.get_event_loop().close()

self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

super().init_process()

def run(self):
self._runner = asyncio.async(self._run(), loop=self.loop)

try:
self.loop.run_until_complete(self._runner)
finally:
self.loop.close()

def wrap_protocol(self, proto):
proto.connection_made = _wrp(
proto, proto.connection_made, self.connections)
proto.connection_lost = _wrp(
proto, proto.connection_lost, self.connections, False)
return proto

def factory(self, wsgi, addr):
proto = WSGIServerHttpProtocol(
wsgi, readpayload=True,
loop=self.loop,
log=self.log,
debug=self.cfg.debug,
keep_alive=self.cfg.keepalive,
access_log=self.log.access_log,
access_log_format=self.cfg.access_log_format)
return self.wrap_protocol(proto)

def get_factory(self, sock, addr):
return functools.partial(self.factory, self.wsgi, addr)

@asyncio.coroutine
def close(self):
try:
if hasattr(self.wsgi, 'close'):
yield from self.wsgi.close()
except:
self.log.exception('Process shutdown exception')

@asyncio.coroutine
def _run(self):
for sock in self.sockets:
factory = self.get_factory(sock.sock, sock.cfg_addr)
self.servers.append(
(yield from self.loop.create_server(factory, sock=sock.sock)))

# If our parent changed then we shut down.
pid = os.getpid()
try:
while self.alive or self.connections:
self.notify()

if (self.alive and
pid == os.getpid() and self.ppid != os.getppid()):
self.log.info("Parent changed, shutting down: %s", self)
self.alive = False

# stop accepting requests
if not self.alive:
if self.servers:
self.log.info(
"Stopping server: %s, connections: %s",
pid, len(self.connections))
for server in self.servers:
server.close()
self.servers.clear()

# prepare connections for closing
for conn in self.connections.values():
if hasattr(conn, 'closing'):
conn.closing()

yield from asyncio.sleep(1.0, loop=self.loop)
except KeyboardInterrupt:
pass

if self.servers:
for server in self.servers:
server.close()

yield from self.close()


class _wrp:

def __init__(self, proto, meth, tracking, add=True):
self._proto = proto
self._id = id(proto)
self._meth = meth
self._tracking = tracking
self._add = add

def __call__(self, *args):
if self._add:
self._tracking[self._id] = self._proto
elif self._id in self._tracking:
del self._tracking[self._id]

conn = self._meth(*args)
return conn
139 changes: 12 additions & 127 deletions gunicorn/workers/gaiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,131 +2,16 @@
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
__all__ = ['AiohttpWorker']

import asyncio
import functools
import os
import gunicorn.workers.base as base

try:
from aiohttp.wsgi import WSGIServerHttpProtocol
except ImportError:
raise RuntimeError("You need aiohttp installed to use this worker.")


class AiohttpWorker(base.Worker):

def __init__(self, *args, **kw): # pragma: no cover
super().__init__(*args, **kw)

self.servers = []
self.connections = {}

def init_process(self):
# create new event_loop after fork
asyncio.get_event_loop().close()

self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

super().init_process()

def run(self):
self._runner = asyncio.async(self._run(), loop=self.loop)

try:
self.loop.run_until_complete(self._runner)
finally:
self.loop.close()

def wrap_protocol(self, proto):
proto.connection_made = _wrp(
proto, proto.connection_made, self.connections)
proto.connection_lost = _wrp(
proto, proto.connection_lost, self.connections, False)
return proto

def factory(self, wsgi, addr):
proto = WSGIServerHttpProtocol(
wsgi, readpayload=True,
loop=self.loop,
log=self.log,
debug=self.cfg.debug,
keep_alive=self.cfg.keepalive,
access_log=self.log.access_log,
access_log_format=self.cfg.access_log_format)
return self.wrap_protocol(proto)

def get_factory(self, sock, addr):
return functools.partial(self.factory, self.wsgi, addr)

@asyncio.coroutine
def close(self):
try:
if hasattr(self.wsgi, 'close'):
yield from self.wsgi.close()
except:
self.log.exception('Process shutdown exception')

@asyncio.coroutine
def _run(self):
for sock in self.sockets:
factory = self.get_factory(sock.sock, sock.cfg_addr)
self.servers.append(
(yield from self.loop.create_server(factory, sock=sock.sock)))

# If our parent changed then we shut down.
pid = os.getpid()
try:
while self.alive or self.connections:
self.notify()

if (self.alive and
pid == os.getpid() and self.ppid != os.getppid()):
self.log.info("Parent changed, shutting down: %s", self)
self.alive = False

# stop accepting requests
if not self.alive:
if self.servers:
self.log.info(
"Stopping server: %s, connections: %s",
pid, len(self.connections))
for server in self.servers:
server.close()
self.servers.clear()

# prepare connections for closing
for conn in self.connections.values():
if hasattr(conn, 'closing'):
conn.closing()

yield from asyncio.sleep(1.0, loop=self.loop)
except KeyboardInterrupt:
pass

if self.servers:
for server in self.servers:
server.close()

yield from self.close()


class _wrp:

def __init__(self, proto, meth, tracking, add=True):
self._proto = proto
self._id = id(proto)
self._meth = meth
self._tracking = tracking
self._add = add

def __call__(self, *args):
if self._add:
self._tracking[self._id] = self._proto
elif self._id in self._tracking:
del self._tracking[self._id]

conn = self._meth(*args)
return conn
import sys

if sys.version_info >= (3, 3):
try:
import aiohttp
except ImportError:
raise RuntimeError("You need aiohttp installed to use this worker.")
else:
from gunicorn.workers._gaiohttp import AiohttpWorker
__all__ = ['AiohttpWorker']
else:
raise RuntimeError("You need Python >= 3.3 to use the asyncio worker")
22 changes: 4 additions & 18 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@


import os
from setuptools import setup
from setuptools.command.test import test as TestCommand

import sys

from gunicorn import __version__

from setuptools import setup, find_packages
from setuptools.command.test import test as TestCommand

ASYNCIO_COMPAT = sys.version_info >= (3, 3)
from gunicorn import __version__


CLASSIFIERS = [
Expand Down Expand Up @@ -71,17 +68,6 @@ def run_tests(self):

REQUIREMENTS = []

py_modules = []

for root, folders, files in os.walk('gunicorn'):
for f in files:
if f.endswith('.py') and (ASYNCIO_COMPAT or f != 'gaiohttp.py'):
full = os.path.join(root, f[:-3])
parts = full.split(os.path.sep)
modname = '.'.join(parts)
py_modules.append(modname)


setup(
name = 'gunicorn',
version = __version__,
Expand All @@ -95,7 +81,7 @@ def run_tests(self):

classifiers = CLASSIFIERS,
zip_safe = False,
py_modules = py_modules,
packages = find_packages(exclude=['examples', 'tests']),
include_package_data = True,

tests_require = tests_require,
Expand Down

1 comment on commit 719e61b

@koobs
Copy link

@koobs koobs commented on 719e61b Feb 25, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This broke install time compilation (regression of what 86f7404 fixed), resulting in _gaiohttp.py being installed, who's compilation fails (pyc,pyo), resulting in --record output being incorrect.

See #788

Please sign in to comment.