Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 34 additions & 6 deletions airflow-core/src/airflow/cli/commands/api_server_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,27 @@
import subprocess
import sys
import textwrap
from collections.abc import Callable
from functools import wraps
from typing import TYPE_CHECKING, TypeVar

import uvicorn

from airflow import settings
from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
from airflow.exceptions import AirflowConfigException
from airflow.typing_compat import ParamSpec
from airflow.utils import cli as cli_utils
from airflow.utils.providers_configuration_loader import providers_configuration_loaded

PS = ParamSpec("PS")
RT = TypeVar("RT")
AIRFLOW_API_APPS = "AIRFLOW_API_APPS"

log = logging.getLogger(__name__)

if TYPE_CHECKING:
from argparse import Namespace

# This shouldn't be necessary but there seems to be an issue in uvloop that causes bad file descriptor
# errors when shutting down workers. Despite the 'closed' status of the issue it is not solved,
Expand Down Expand Up @@ -89,9 +99,32 @@ def _run_api_server(args, apps: str, num_workers: int, worker_timeout: int, prox
)


def with_api_apps_env(func: Callable[[Namespace], RT]) -> Callable[[Namespace], RT]:
"""We use AIRFLOW_API_APPS to specify which apps are initialized in the API server."""

@wraps(func)
def wrapper(args: Namespace) -> RT:
apps: str = args.apps
original_value = os.environ.get(AIRFLOW_API_APPS)
try:
log.debug("Setting AIRFLOW_API_APPS to: %s", apps)
os.environ[AIRFLOW_API_APPS] = apps
return func(args)
finally:
if original_value is not None:
os.environ[AIRFLOW_API_APPS] = original_value
log.debug("Restored AIRFLOW_API_APPS to: %s", original_value)
else:
os.environ.pop(AIRFLOW_API_APPS, None)
log.debug("Removed AIRFLOW_API_APPS from environment")

return wrapper


@cli_utils.action_cli
@providers_configuration_loaded
def api_server(args):
@with_api_apps_env
def api_server(args: Namespace):
"""Start Airflow API server."""
print(settings.HEADER)

Expand Down Expand Up @@ -125,16 +158,11 @@ def api_server(args):
if args.log_config and args.log_config != "-":
run_args.extend(["--log-config", args.log_config])

# There is no way to pass the apps to airflow/api_fastapi/main.py in the development mode
# because fastapi dev command does not accept any additional arguments
# so environment variable is being used to pass it
os.environ["AIRFLOW_API_APPS"] = apps
with subprocess.Popen(
run_args,
close_fds=True,
) as process:
process.wait()
os.environ.pop("AIRFLOW_API_APPS")
else:
run_command_with_daemon_option(
args=args,
Expand Down
79 changes: 47 additions & 32 deletions airflow-core/tests/unit/cli/commands/test_api_server_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,49 +101,64 @@ def test_dev_arg(self, args, expected_command):
close_fds=True,
)

def test_apps_env_var_set_unset(self):
@pytest.mark.parametrize(
"args",
[
(["api-server"]),
(["api-server", "--apps", "all"]),
(["api-server", "--apps", "core,execution"]),
(["api-server", "--apps", "core"]),
(["api-server", "--apps", "execution"]),
],
ids=[
"default_apps",
"all_apps_explicit",
"multiple_apps_explicit",
"single_app_core",
"single_app_execution",
],
)
@pytest.mark.parametrize("dev_mode", [True, False])
@pytest.mark.parametrize(
"original_env",
[None, "some_value"],
)
def test_api_apps_env(self, args, dev_mode, original_env):
"""
Test that AIRFLOW_API_APPS is set and unset in the environment when
calling the airflow api-server command
"""
expected_setitem_calls = []

if dev_mode:
args.append("--dev")

with (
mock.patch("subprocess.Popen") as Popen,
mock.patch("os.environ", autospec=True) as mock_environ,
mock.patch("uvicorn.run"),
mock.patch("subprocess.Popen"),
):
apps_value = "core,execution"
port = "9092"
host = "somehost"
# Mock the environment variable with initial value or None
mock_environ.get.return_value = original_env

# Parse the command line arguments
args = self.parser.parse_args(
["api-server", "--port", port, "--host", host, "--apps", apps_value, "--dev"]
)

# Ensure AIRFLOW_API_APPS is not set initially
mock_environ.get.return_value = None
# Parse the command line arguments and call the api_server command
parsed_args = self.parser.parse_args(args)
api_server_command.api_server(parsed_args)

# Call the fastapi_api command
api_server_command.api_server(args)

# Assert that AIRFLOW_API_APPS was set in the environment before subprocess
mock_environ.__setitem__.assert_called_with("AIRFLOW_API_APPS", apps_value)
# Verify the AIRFLOW_API_APPS was set correctly
if "--apps" in args:
expected_setitem_calls.append(
mock.call(api_server_command.AIRFLOW_API_APPS, parsed_args.apps)
)

# Simulate subprocess execution
Popen.assert_called_with(
[
"fastapi",
"dev",
"airflow-core/src/airflow/api_fastapi/main.py",
"--port",
port,
"--host",
host,
],
close_fds=True,
)
# Verify AIRFLOW_API_APPS was cleaned up
if original_env is not None:
expected_setitem_calls.append(mock.call(api_server_command.AIRFLOW_API_APPS, original_env))
else:
mock_environ.pop.assert_called_with(api_server_command.AIRFLOW_API_APPS, None)

# Assert that AIRFLOW_API_APPS was unset after subprocess
mock_environ.pop.assert_called_with("AIRFLOW_API_APPS")
# Verify that the environment variable was set and cleaned up correctly
mock_environ.__setitem__.assert_has_calls(expected_setitem_calls)

@pytest.mark.parametrize(
"cli_args, expected_additional_kwargs",
Expand Down