Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Define web.Application.on_startup() signal handler #1103

Merged
merged 1 commit into from
Aug 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions aiohttp/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ def __init__(self, *, logger=web_logger, loop=None,
self._on_pre_signal = PreSignal()
self._on_post_signal = PostSignal()
self._on_response_prepare = Signal(self)
self._on_startup = Signal(self)
self._on_shutdown = Signal(self)
self._on_cleanup = Signal(self)

Expand All @@ -190,6 +191,10 @@ def on_pre_signal(self):
def on_post_signal(self):
return self._on_post_signal

@property
def on_startup(self):
return self._on_startup

@property
def on_shutdown(self):
return self._on_shutdown
Expand All @@ -214,6 +219,14 @@ def make_handler(self, **kwargs):
return self._handler_factory(
self, self.router, loop=self.loop, **kwargs)

@asyncio.coroutine
def startup(self):
"""Causes on_startup signal

Should be called in the event loop along with the request handler.
"""
yield from self.on_startup.send(self)

@asyncio.coroutine
def shutdown(self):
"""Causes on_shutdown signal
Expand Down Expand Up @@ -267,9 +280,11 @@ def run_app(app, *, host='0.0.0.0', port=None,
loop = app.loop

handler = app.make_handler()
srv = loop.run_until_complete(loop.create_server(handler, host, port,
ssl=ssl_context,
backlog=backlog))
server = loop.create_server(handler, host, port, ssl=ssl_context,
backlog=backlog)
srv, startup_res = loop.run_until_complete(asyncio.gather(server,
app.startup(),
loop=loop))

scheme = 'https' if ssl_context else 'http'
print("======== Running on {scheme}://{host}:{port}/ ========\n"
Expand Down
59 changes: 58 additions & 1 deletion docs/web.rst
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ handler::

return ws

Signal handler may looks like::
Signal handler may look like::

async def on_shutdown(app):
for ws in app['websockets']:
Expand Down Expand Up @@ -986,6 +986,63 @@ finalizing. It's pretty close to :func:`run_app` utility function::
loop.run_until_complete(app.cleanup())
loop.close()

.. _aiohttp-web-background-tasks:

Background tasks
-----------------

Sometimes there's a need to perform some asynchronous operations just
after application start-up.

Even more, in some sophisticated systems there could be a need to run some
background tasks in the event loop along with the application's request
handler. Such as listening to message queue or other network message/event
sources (e.g. ZeroMQ, Redis Pub/Sub, AMQP, etc.) to react to received messages
within the application.

For example the background task could listen to ZeroMQ on :data:`zmq.SUB` socket,
process and forward retrieved messages to clients connected via WebSocket
that are stored somewhere in the application
(e.g. in the :obj:`application['websockets']` list).

To run such short and long running background tasks aiohttp provides an
ability to register :attr:`Application.on_startup` signal handler(s) that
will run along with the application's request handler.

For example there's a need to run one quick task and two long running
tasks that will live till the application is alive. The appropriate
background tasks could be registered as an :attr:`Application.on_startup`
signal handlers as shown in the example below::

app = web.Application()

async def quickly_notify_monitoring(app):
"""Send notification to monitoring service about the app process start-up"""
pass

async def listen_to_zeromq(app):
"""Listen to messages on zmq.SUB socket"""
pass

async def listen_to_redis(app):
"""Listen to messages from Redis Pub/Sub"""
pass

async def run_all_long_running_tasks(app):
return await asyncio.gather(listen_to_zeromq(app),
listen_to_redis(app),
loop=app.loop)
app.on_startup.append(quickly_notify_monitoring)
app.on_startup.append(run_all_long_running_tasks)
web.run_app(app)


The :func:`quickly_notify_monitoring` from the example above will complete
and exit but :func:`listen_to_zeromq` and :func:`listen_to_redis` will take
forever.
An :attr:`Application.on_cleanup` signal handler may be used to send a
cancellation to all registered long-running tasks.


CORS support
------------
Expand Down
24 changes: 23 additions & 1 deletion docs/web_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1042,6 +1042,21 @@ duplicated like one using :meth:`Application.copy`.
async def on_prepare(request, response):
pass

.. attribute:: on_startup

A :class:`~aiohttp.signals.Signal` that is fired on application start-up.

Subscribers may use the signal to run background tasks in the event
loop along with the application's request handler just after the
application start-up.

Signal handlers should have the following signature::

async def on_startup(app):
pass

.. seealso:: :ref:`aiohttp-web-background-tasks`.

.. attribute:: on_shutdown

A :class:`~aiohttp.signals.Signal` that is fired on application shutdown.
Expand Down Expand Up @@ -1076,7 +1091,6 @@ duplicated like one using :meth:`Application.copy`.

.. seealso:: :ref:`aiohttp-web-graceful-shutdown` and :attr:`on_shutdown`.


.. method:: make_handler(**kwargs)

Creates HTTP protocol factory for handling requests.
Expand Down Expand Up @@ -1104,6 +1118,14 @@ duplicated like one using :meth:`Application.copy`.
secure_proxy_ssl_header='X-Forwarded-Proto'),
'0.0.0.0', 8080)

.. coroutinemethod:: startup()

A :ref:`coroutine<coroutine>` that will be called along with the
application's request handler.

The purpose of the method is calling :attr:`on_startup` signal
handlers.

.. coroutinemethod:: shutdown()

A :ref:`coroutine<coroutine>` that should be called on
Expand Down
8 changes: 8 additions & 0 deletions tests/test_run_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,22 @@ def test_run_app_http(loop, mocker):
loop.call_later(0.02, loop.stop)

app = web.Application(loop=loop)
mocker.spy(app, 'startup')

web.run_app(app, print=lambda *args: None)

assert loop.is_closed()
loop.create_server.assert_called_with(mock.ANY, '0.0.0.0', 8080,
ssl=None, backlog=128)
app.startup.assert_called_once_with()


def test_run_app_https(loop, mocker):
mocker.spy(loop, 'create_server')
loop.call_later(0.02, loop.stop)

app = web.Application(loop=loop)
mocker.spy(app, 'startup')

ssl_context = ssl.create_default_context()

Expand All @@ -30,6 +33,7 @@ def test_run_app_https(loop, mocker):
assert loop.is_closed()
loop.create_server.assert_called_with(mock.ANY, '0.0.0.0', 8443,
ssl=ssl_context, backlog=128)
app.startup.assert_called_once_with()


def test_run_app_nondefault_host_port(loop, unused_port, mocker):
Expand All @@ -40,22 +44,26 @@ def test_run_app_nondefault_host_port(loop, unused_port, mocker):
loop.call_later(0.02, loop.stop)

app = web.Application(loop=loop)
mocker.spy(app, 'startup')

web.run_app(app, host=host, port=port, print=lambda *args: None)

assert loop.is_closed()
loop.create_server.assert_called_with(mock.ANY, host, port,
ssl=None, backlog=128)
app.startup.assert_called_once_with()


def test_run_app_custom_backlog(loop, mocker):
mocker.spy(loop, 'create_server')
loop.call_later(0.02, loop.stop)

app = web.Application(loop=loop)
mocker.spy(app, 'startup')

web.run_app(app, backlog=10, print=lambda *args: None)

assert loop.is_closed()
loop.create_server.assert_called_with(mock.ANY, '0.0.0.0', 8080,
ssl=None, backlog=10)
app.startup.assert_called_once_with()
45 changes: 45 additions & 0 deletions tests/test_web_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,48 @@ def on_shutdown(app_param):

yield from app.shutdown()
assert called


@pytest.mark.run_loop
def test_on_startup(loop):
app = web.Application(loop=loop)

blocking_called = False
long_running1_called = False
long_running2_called = False
all_long_running_called = False

def on_startup_blocking(app_param):
nonlocal blocking_called
assert app is app_param
blocking_called = True

@asyncio.coroutine
def long_running1(app_param):
nonlocal long_running1_called
assert app is app_param
long_running1_called = True

@asyncio.coroutine
def long_running2(app_param):
nonlocal long_running2_called
assert app is app_param
long_running2_called = True

@asyncio.coroutine
def on_startup_all_long_running(app_param):
nonlocal all_long_running_called
assert app is app_param
all_long_running_called = True
return (yield from asyncio.gather(long_running1(app_param),
long_running2(app_param),
loop=app_param.loop))

app.on_startup.append(on_startup_blocking)
app.on_startup.append(on_startup_all_long_running)

yield from app.startup()
assert blocking_called
assert long_running1_called
assert long_running2_called
assert all_long_running_called