Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use gunicorn to serve logs generated by worker #17591

Merged
merged 6 commits into from
Aug 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions airflow/utils/serve_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@
import os
import time

import gunicorn.app.base
from flask import Flask, abort, request, send_from_directory
from itsdangerous import TimedJSONWebSignatureSerializer
from setproctitle import setproctitle

from airflow.configuration import conf


def flask_app():
flask_app = Flask(__name__)
def create_app():
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flask_app = Flask(__name__, static_folder=None)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't support this feature, so we can safely turn it off.

max_request_age = conf.getint('webserver', 'log_request_clock_grace', fallback=30)
log_directory = os.path.expanduser(conf.get('logging', 'BASE_LOG_FOLDER'))

Expand Down Expand Up @@ -73,10 +74,42 @@ def serve_logs_view(filename):
return flask_app


class StandaloneGunicornApplication(gunicorn.app.base.BaseApplication):
"""
Standalone Gunicorn application/serve for usage with any WSGI-application.

Code inspired by an example from the Gunicorn documentation.
https://github.com/benoitc/gunicorn/blob/cf55d2cec277f220ebd605989ce78ad1bb553c46/examples/standalone_app.py

For details, about standalone gunicorn application, see:
https://docs.gunicorn.org/en/stable/custom.html
"""

def __init__(self, app, options=None):
self.options = options or {}
self.application = app
super().__init__()

def load_config(self):
for key, value in self.options.items():
self.cfg.set(key.lower(), value)

def load(self):
return self.application


def serve_logs():
"""Serves logs generated by Worker"""
setproctitle("airflow serve-logs")
app = flask_app()
wsgi_app = create_app()

worker_log_server_port = conf.getint('celery', 'WORKER_LOG_SERVER_PORT')
app.run(host='0.0.0.0', port=worker_log_server_port)
options = {
'bind': f"0.0.0.0:{worker_log_server_port}",
'workers': 2,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need 2 workers or do away with just 1?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 should work too, but I configured 2 to failover in case one worker had problems.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah 2 is better just in case of memory errors/crashes. Just one small nit (in case somene has problems with memory usage etc.) i think it would be great to mention in the docs of worker that we are using Gunicorm and that the configuration options can be overridden by GUNiCORN_CMD_ARGS env variable https://docs.gunicorn.org/en/latest/settings.html#settings

It's not at all obvious from docs without looking at the code now that wlwe have separate Gunicorm processes forked and that you can configure their behaviour via ENV vars.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Added docs.

}
StandaloneGunicornApplication(wsgi_app, options).run()


if __name__ == '__main__':
serve_logs()
13 changes: 13 additions & 0 deletions docs/apache-airflow/logging-monitoring/logging-tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,16 @@ External Links
When using remote logging, users can configure Airflow to show a link to an external UI within the Airflow Web UI. Clicking the link redirects a user to the external UI.

Some external systems require specific configuration in Airflow for redirection to work but others do not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

Serving logs from workers
-------------------------

Most task handlers send logs upon completion of a task. In order to view logs in real time, airflow automatically starts an http server to serve the logs in the following cases:

- If ``SchedulerExecutor`` or ``LocalExecutor`` is used, then when ``airflow scheduler`` is running.
- If ``CeleryExecutor`` is used, then when ``airflow worker`` is running.

The server is running on the port specified by ``worker_log_server_port`` option in ``celery`` section. By default, it is ``8793``.
Communication between the webserver and the worker is signed with the key specified by ``secret_key`` option in ``webserver`` section. You must ensure that the key matches so that communication can take place without problems.

We are using `Gunicorm <https://gunicorn.org/>`__ as a WSGI server. Its configuration options can be overridden with the ``GUNICORN_CMD_ARGS`` env variable. For details, see `Gunicorn settings <https://docs.gunicorn.org/en/latest/settings.html#settings>`__.
4 changes: 2 additions & 2 deletions tests/utils/test_serve_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from itsdangerous import TimedJSONWebSignatureSerializer

from airflow.configuration import conf
from airflow.utils.serve_logs import flask_app
from airflow.utils.serve_logs import create_app
from tests.test_utils.config import conf_vars

if TYPE_CHECKING:
Expand All @@ -32,7 +32,7 @@
@pytest.fixture
def client(tmpdir):
with conf_vars({('logging', 'base_log_folder'): str(tmpdir)}):
app = flask_app()
app = create_app()

yield app.test_client()

Expand Down