diff --git a/airflow-core/src/airflow/cli/commands/api_server_command.py b/airflow-core/src/airflow/cli/commands/api_server_command.py index 5a8e1fb86411c..2e0744be5b8dd 100644 --- a/airflow-core/src/airflow/cli/commands/api_server_command.py +++ b/airflow-core/src/airflow/cli/commands/api_server_command.py @@ -23,17 +23,27 @@ import subprocess import sys import textwrap +from collections.abc import Callable +from functools import wraps +from typing import TYPE_CHECKING, TypeVar import uvicorn from airflow import settings from airflow.cli.commands.daemon_utils import run_command_with_daemon_option from airflow.exceptions import AirflowConfigException +from airflow.typing_compat import ParamSpec from airflow.utils import cli as cli_utils from airflow.utils.providers_configuration_loader import providers_configuration_loaded +PS = ParamSpec("PS") +RT = TypeVar("RT") +AIRFLOW_API_APPS = "AIRFLOW_API_APPS" + log = logging.getLogger(__name__) +if TYPE_CHECKING: + from argparse import Namespace # This shouldn't be necessary but there seems to be an issue in uvloop that causes bad file descriptor # errors when shutting down workers. Despite the 'closed' status of the issue it is not solved, @@ -89,9 +99,32 @@ def _run_api_server(args, apps: str, num_workers: int, worker_timeout: int, prox ) +def with_api_apps_env(func: Callable[[Namespace], RT]) -> Callable[[Namespace], RT]: + """We use AIRFLOW_API_APPS to specify which apps are initialized in the API server.""" + + @wraps(func) + def wrapper(args: Namespace) -> RT: + apps: str = args.apps + original_value = os.environ.get(AIRFLOW_API_APPS) + try: + log.debug("Setting AIRFLOW_API_APPS to: %s", apps) + os.environ[AIRFLOW_API_APPS] = apps + return func(args) + finally: + if original_value is not None: + os.environ[AIRFLOW_API_APPS] = original_value + log.debug("Restored AIRFLOW_API_APPS to: %s", original_value) + else: + os.environ.pop(AIRFLOW_API_APPS, None) + log.debug("Removed AIRFLOW_API_APPS from environment") + + return wrapper + + @cli_utils.action_cli @providers_configuration_loaded -def api_server(args): +@with_api_apps_env +def api_server(args: Namespace): """Start Airflow API server.""" print(settings.HEADER) @@ -125,16 +158,11 @@ def api_server(args): if args.log_config and args.log_config != "-": run_args.extend(["--log-config", args.log_config]) - # There is no way to pass the apps to airflow/api_fastapi/main.py in the development mode - # because fastapi dev command does not accept any additional arguments - # so environment variable is being used to pass it - os.environ["AIRFLOW_API_APPS"] = apps with subprocess.Popen( run_args, close_fds=True, ) as process: process.wait() - os.environ.pop("AIRFLOW_API_APPS") else: run_command_with_daemon_option( args=args, diff --git a/airflow-core/tests/unit/cli/commands/test_api_server_command.py b/airflow-core/tests/unit/cli/commands/test_api_server_command.py index 89689d3d9ba85..2f3dec30f3dbb 100644 --- a/airflow-core/tests/unit/cli/commands/test_api_server_command.py +++ b/airflow-core/tests/unit/cli/commands/test_api_server_command.py @@ -101,49 +101,64 @@ def test_dev_arg(self, args, expected_command): close_fds=True, ) - def test_apps_env_var_set_unset(self): + @pytest.mark.parametrize( + "args", + [ + (["api-server"]), + (["api-server", "--apps", "all"]), + (["api-server", "--apps", "core,execution"]), + (["api-server", "--apps", "core"]), + (["api-server", "--apps", "execution"]), + ], + ids=[ + "default_apps", + "all_apps_explicit", + "multiple_apps_explicit", + "single_app_core", + "single_app_execution", + ], + ) + @pytest.mark.parametrize("dev_mode", [True, False]) + @pytest.mark.parametrize( + "original_env", + [None, "some_value"], + ) + def test_api_apps_env(self, args, dev_mode, original_env): """ Test that AIRFLOW_API_APPS is set and unset in the environment when calling the airflow api-server command """ + expected_setitem_calls = [] + + if dev_mode: + args.append("--dev") + with ( - mock.patch("subprocess.Popen") as Popen, mock.patch("os.environ", autospec=True) as mock_environ, + mock.patch("uvicorn.run"), + mock.patch("subprocess.Popen"), ): - apps_value = "core,execution" - port = "9092" - host = "somehost" + # Mock the environment variable with initial value or None + mock_environ.get.return_value = original_env - # Parse the command line arguments - args = self.parser.parse_args( - ["api-server", "--port", port, "--host", host, "--apps", apps_value, "--dev"] - ) - - # Ensure AIRFLOW_API_APPS is not set initially - mock_environ.get.return_value = None + # Parse the command line arguments and call the api_server command + parsed_args = self.parser.parse_args(args) + api_server_command.api_server(parsed_args) - # Call the fastapi_api command - api_server_command.api_server(args) - - # Assert that AIRFLOW_API_APPS was set in the environment before subprocess - mock_environ.__setitem__.assert_called_with("AIRFLOW_API_APPS", apps_value) + # Verify the AIRFLOW_API_APPS was set correctly + if "--apps" in args: + expected_setitem_calls.append( + mock.call(api_server_command.AIRFLOW_API_APPS, parsed_args.apps) + ) - # Simulate subprocess execution - Popen.assert_called_with( - [ - "fastapi", - "dev", - "airflow-core/src/airflow/api_fastapi/main.py", - "--port", - port, - "--host", - host, - ], - close_fds=True, - ) + # Verify AIRFLOW_API_APPS was cleaned up + if original_env is not None: + expected_setitem_calls.append(mock.call(api_server_command.AIRFLOW_API_APPS, original_env)) + else: + mock_environ.pop.assert_called_with(api_server_command.AIRFLOW_API_APPS, None) - # Assert that AIRFLOW_API_APPS was unset after subprocess - mock_environ.pop.assert_called_with("AIRFLOW_API_APPS") + # Verify that the environment variable was set and cleaned up correctly + mock_environ.__setitem__.assert_has_calls(expected_setitem_calls) @pytest.mark.parametrize( "cli_args, expected_additional_kwargs",