Skip to content

Commit

Permalink
Use gunicorn to serve logs generated by worker (#17591)
Browse files Browse the repository at this point in the history
  • Loading branch information
mik-laj authored Aug 14, 2021
1 parent 6df3ee7 commit bd4f0c3
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 6 deletions.
41 changes: 37 additions & 4 deletions airflow/utils/serve_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@
import os
import time

import gunicorn.app.base
from flask import Flask, abort, request, send_from_directory
from itsdangerous import TimedJSONWebSignatureSerializer
from setproctitle import setproctitle

from airflow.configuration import conf


def flask_app():
flask_app = Flask(__name__)
def create_app():
flask_app = Flask(__name__, static_folder=None)
max_request_age = conf.getint('webserver', 'log_request_clock_grace', fallback=30)
log_directory = os.path.expanduser(conf.get('logging', 'BASE_LOG_FOLDER'))

Expand Down Expand Up @@ -73,10 +74,42 @@ def serve_logs_view(filename):
return flask_app


class StandaloneGunicornApplication(gunicorn.app.base.BaseApplication):
"""
Standalone Gunicorn application/serve for usage with any WSGI-application.
Code inspired by an example from the Gunicorn documentation.
https://github.com/benoitc/gunicorn/blob/cf55d2cec277f220ebd605989ce78ad1bb553c46/examples/standalone_app.py
For details, about standalone gunicorn application, see:
https://docs.gunicorn.org/en/stable/custom.html
"""

def __init__(self, app, options=None):
self.options = options or {}
self.application = app
super().__init__()

def load_config(self):
for key, value in self.options.items():
self.cfg.set(key.lower(), value)

def load(self):
return self.application


def serve_logs():
"""Serves logs generated by Worker"""
setproctitle("airflow serve-logs")
app = flask_app()
wsgi_app = create_app()

worker_log_server_port = conf.getint('celery', 'WORKER_LOG_SERVER_PORT')
app.run(host='0.0.0.0', port=worker_log_server_port)
options = {
'bind': f"0.0.0.0:{worker_log_server_port}",
'workers': 2,
}
StandaloneGunicornApplication(wsgi_app, options).run()


if __name__ == '__main__':
serve_logs()
13 changes: 13 additions & 0 deletions docs/apache-airflow/logging-monitoring/logging-tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,16 @@ External Links
When using remote logging, users can configure Airflow to show a link to an external UI within the Airflow Web UI. Clicking the link redirects a user to the external UI.

Some external systems require specific configuration in Airflow for redirection to work but others do not.

Serving logs from workers
-------------------------

Most task handlers send logs upon completion of a task. In order to view logs in real time, airflow automatically starts an http server to serve the logs in the following cases:

- If ``SchedulerExecutor`` or ``LocalExecutor`` is used, then when ``airflow scheduler`` is running.
- If ``CeleryExecutor`` is used, then when ``airflow worker`` is running.

The server is running on the port specified by ``worker_log_server_port`` option in ``celery`` section. By default, it is ``8793``.
Communication between the webserver and the worker is signed with the key specified by ``secret_key`` option in ``webserver`` section. You must ensure that the key matches so that communication can take place without problems.

We are using `Gunicorm <https://gunicorn.org/>`__ as a WSGI server. Its configuration options can be overridden with the ``GUNICORN_CMD_ARGS`` env variable. For details, see `Gunicorn settings <https://docs.gunicorn.org/en/latest/settings.html#settings>`__.
4 changes: 2 additions & 2 deletions tests/utils/test_serve_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from itsdangerous import TimedJSONWebSignatureSerializer

from airflow.configuration import conf
from airflow.utils.serve_logs import flask_app
from airflow.utils.serve_logs import create_app
from tests.test_utils.config import conf_vars

if TYPE_CHECKING:
Expand All @@ -32,7 +32,7 @@
@pytest.fixture
def client(tmpdir):
with conf_vars({('logging', 'base_log_folder'): str(tmpdir)}):
app = flask_app()
app = create_app()

yield app.test_client()

Expand Down

0 comments on commit bd4f0c3

Please sign in to comment.