Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions airflow-core/newsfragments/52860.significant.rst

This file was deleted.

10 changes: 5 additions & 5 deletions airflow-core/src/airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,10 +618,10 @@ def string_lower_type(val):
default=conf.get("api", "host"),
help="Set the host on which to run the API server",
)
ARG_API_SERVER_LOG_CONFIG = Arg(
("--log-config",),
default=conf.get("api", "log_config", fallback=None),
help="(Optional) Path to the logging configuration file for the uvicorn server. If not set, the default uvicorn logging configuration will be used.",
ARG_API_SERVER_ACCESS_LOGFILE = Arg(
("-A", "--access-logfile"),
default=conf.get("api", "access_logfile"),
help="The logfile to store the access log. Use '-' to print to stdout",
)
ARG_API_SERVER_APPS = Arg(
("--apps",),
Expand Down Expand Up @@ -1795,7 +1795,7 @@ class GroupCommand(NamedTuple):
ARG_DAEMON,
ARG_STDOUT,
ARG_STDERR,
ARG_API_SERVER_LOG_CONFIG,
ARG_API_SERVER_ACCESS_LOGFILE,
ARG_API_SERVER_APPS,
ARG_LOG_FILE,
ARG_SSL_CERT,
Expand Down
96 changes: 32 additions & 64 deletions airflow-core/src/airflow/cli/commands/api_server_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@
import logging
import os
import subprocess
import sys
import textwrap
from collections.abc import Callable
from functools import wraps
from typing import TYPE_CHECKING, TypeVar

import uvicorn
from gunicorn.util import daemonize
from setproctitle import setproctitle

from airflow import settings
from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
from airflow.exceptions import AirflowConfigException
from airflow.typing_compat import ParamSpec
from airflow.utils import cli as cli_utils
Expand All @@ -50,55 +50,6 @@
# more info here: https://github.com/benoitc/gunicorn/issues/1877#issuecomment-1911136399


def _run_api_server(args, apps: str, num_workers: int, worker_timeout: int, proxy_headers: bool):
"""Run the API server."""
log.info(
textwrap.dedent(
f"""\
Running the uvicorn with:
Apps: {apps}
Workers: {num_workers}
Host: {args.host}:{args.port}
Timeout: {worker_timeout}
Logfiles: {args.log_file or "-"}
================================================================="""
)
)
# get ssl cert and key filepaths here instead of passing them as arguments to reduce the number of arguments
ssl_cert, ssl_key = _get_ssl_cert_and_key_filepaths(args)

# setproctitle causes issue on Mac OS: https://github.com/benoitc/gunicorn/issues/3021
os_type = sys.platform
if os_type == "darwin":
log.debug("Mac OS detected, skipping setproctitle")
else:
from setproctitle import setproctitle

setproctitle(f"airflow api_server -- host:{args.host} port:{args.port}")

uvicorn_kwargs = {
"host": args.host,
"port": args.port,
"workers": num_workers,
"timeout_keep_alive": worker_timeout,
"timeout_graceful_shutdown": worker_timeout,
"ssl_keyfile": ssl_key,
"ssl_certfile": ssl_cert,
"access_log": True,
"proxy_headers": proxy_headers,
}
# Only set the log_config if it is provided, otherwise use the default uvicorn logging configuration.
if args.log_config and args.log_config != "-":
# The [api/log_config] is migrated from [api/access_logfile] and [api/access_logfile] defaults to "-" for stdout for Gunicorn.
# So we need to check if the log_config is set to "-" or not; if it is set to "-", we regard it as not set.
uvicorn_kwargs["log_config"] = args.log_config

uvicorn.run(
"airflow.api_fastapi.main:app",
**uvicorn_kwargs,
)


def with_api_apps_env(func: Callable[[Namespace], RT]) -> Callable[[Namespace], RT]:
"""We use AIRFLOW_API_APPS to specify which apps are initialized in the API server."""

Expand Down Expand Up @@ -129,6 +80,7 @@ def api_server(args: Namespace):
print(settings.HEADER)

apps = args.apps
access_logfile = args.access_logfile or "-"
num_workers = args.workers
worker_timeout = args.worker_timeout
proxy_headers = args.proxy_headers
Expand All @@ -155,25 +107,41 @@ def api_server(args: Namespace):
if args.proxy_headers:
run_args.append("--proxy-headers")

if args.log_config and args.log_config != "-":
run_args.extend(["--log-config", args.log_config])

with subprocess.Popen(
run_args,
close_fds=True,
) as process:
process.wait()
else:
run_command_with_daemon_option(
args=args,
process_name="api_server",
callback=lambda: _run_api_server(
args=args,
apps=apps,
num_workers=num_workers,
worker_timeout=worker_timeout,
proxy_headers=proxy_headers,
),
if args.daemon:
daemonize()
log.info("Daemonized the API server process PID: %s", os.getpid())

log.info(
textwrap.dedent(
f"""\
Running the uvicorn with:
Apps: {apps}
Workers: {num_workers}
Host: {args.host}:{args.port}
Timeout: {worker_timeout}
Logfiles: {access_logfile}
================================================================="""
)
)
ssl_cert, ssl_key = _get_ssl_cert_and_key_filepaths(args)
setproctitle(f"airflow api_server -- host:{args.host} port:{args.port}")
uvicorn.run(
"airflow.api_fastapi.main:app",
host=args.host,
port=args.port,
workers=num_workers,
timeout_keep_alive=worker_timeout,
timeout_graceful_shutdown=worker_timeout,
ssl_keyfile=ssl_key,
ssl_certfile=ssl_cert,
access_log=access_logfile, # type: ignore
proxy_headers=proxy_headers,
)


Expand Down
9 changes: 4 additions & 5 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1391,14 +1391,13 @@ api:
type: integer
example: ~
default: "120"
log_config:
access_logfile:
description: |
Path to the logging configuration file for the uvicorn server.
If not set, the default uvicorn logging configuration will be used.
Log files for the api server. '-' means log to stderr.
version_added: ~
type: string
example: path/to/logging_config.yaml
default: ~
example: ~
default: "-"
ssl_cert:
description: |
Paths to the SSL certificate and key for the api server. When both are
Expand Down
1 change: 0 additions & 1 deletion airflow-core/src/airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,6 @@ def sensitive_config_values(self) -> set[tuple[str, str]]:
("api", "secret_key"): ("webserver", "secret_key", "3.0.2"),
("api", "enable_swagger_ui"): ("webserver", "enable_swagger_ui", "3.0.2"),
("dag_processor", "parsing_pre_import_modules"): ("scheduler", "parsing_pre_import_modules", "3.0.3"),
("api", "log_config"): ("api", "access_logfile", "3.0.4"),
}

# A mapping of new section -> (old section, since_version).
Expand Down
16 changes: 8 additions & 8 deletions airflow-core/tests/unit/cli/commands/_common_cli_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
console = Console(width=400, color_system="standard")


class _CommonCLIUvicornTestClass:
class _CommonCLIGunicornTestClass:
main_process_regexp: str = "process_to_look_for"

@pytest.fixture(autouse=True)
Expand All @@ -49,12 +49,12 @@ def _check_processes(self, ignore_running: bool):
# Confirm that nmain procss hasn't been launched.
# pgrep returns exit status 1 if no process matched.
# Use more specific regexps (^) to avoid matching pytest run when running specific method.
# For instance, we want to be able to do: pytest -k 'uvicorn'
# For instance, we want to be able to do: pytest -k 'gunicorn'
airflow_internal_api_pids = self._find_all_processes(self.main_process_regexp)
uvicorn_pids = self._find_all_processes(r"uvicorn: ")
if airflow_internal_api_pids or uvicorn_pids:
gunicorn_pids = self._find_all_processes(r"gunicorn: ")
if airflow_internal_api_pids or gunicorn_pids:
console.print("[blue]Some processes are still running")
for pid in uvicorn_pids + airflow_internal_api_pids:
for pid in gunicorn_pids + airflow_internal_api_pids:
with suppress(NoSuchProcess):
console.print(psutil.Process(pid).as_dict(attrs=["pid", "name", "cmdline"]))
console.print("[blue]Here list of processes ends")
Expand All @@ -63,9 +63,9 @@ def _check_processes(self, ignore_running: bool):
for pid in airflow_internal_api_pids:
with suppress(NoSuchProcess):
psutil.Process(pid).kill()
if uvicorn_pids:
console.print("[yellow]Forcefully killing all uvicorn processes")
for pid in uvicorn_pids:
if gunicorn_pids:
console.print("[yellow]Forcefully killing all gunicorn processes")
for pid in gunicorn_pids:
with suppress(NoSuchProcess):
psutil.Process(pid).kill()
if not ignore_running:
Expand Down
Loading