Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airflow-core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ dependencies = [
# 0.115.10 fastapi was a bad release that broke our API's and static checks.
# Related fastapi issue here: https://github.com/fastapi/fastapi/discussions/13431
"fastapi[standard]>=0.115.0,!=0.115.10",
# We could get rid of flask and gunicorn if we replace serve_logs with a starlette + unicorn
"flask>=2.1.1",
"gitpython>=3.1.40",
# We could get rid of flask and gunicorn if we replace serve_logs with a starlette + unicorn
"gunicorn>=20.1.0",
"httpx>=0.25.0",
'importlib_metadata>=6.5;python_version<"3.12"',
Expand Down
14 changes: 1 addition & 13 deletions airflow-core/src/airflow/cli/commands/scheduler_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.job import Job, run_job
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
from airflow.providers.celery.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.utils import cli as cli_utils
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
from airflow.utils.scheduler_health import serve_health_check
Expand Down Expand Up @@ -60,18 +59,7 @@ def scheduler(args: Namespace):

@contextmanager
def _serve_logs(skip_serve_logs: bool = False):
"""Start serve_logs sub-process."""
if AIRFLOW_V_3_0_PLUS:
try:
from airflow.providers.fab.www.serve_logs import serve_logs
except ImportError:
raise ImportError(
"Celery requires FAB provider to be installed in order to run this command. "
"Please install the FAB provider by running: "
"pip install apache-airflow-providers-celery[fab]"
)
else:
from airflow.utils.serve_logs import serve_logs # type: ignore[no-redef]
from airflow.utils.serve_logs import serve_logs

sub_proc = None
executor_class, _ = ExecutorLoader.import_default_executor_cls()
Expand Down
13 changes: 1 addition & 12 deletions airflow-core/src/airflow/cli/commands/triggerer_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,14 @@
from airflow.configuration import conf
from airflow.jobs.job import Job, run_job
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
from airflow.providers.celery.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.utils import cli as cli_utils
from airflow.utils.providers_configuration_loader import providers_configuration_loaded


@contextmanager
def _serve_logs(skip_serve_logs: bool = False) -> Generator[None, None, None]:
"""Start serve_logs sub-process."""
if AIRFLOW_V_3_0_PLUS:
try:
from airflow.providers.fab.www.serve_logs import serve_logs
except ImportError:
raise ImportError(
"Celery requires FAB provider to be installed in order to run this command. "
"Please install the FAB provider by running: "
"pip install apache-airflow-providers-celery[fab]"
)
else:
from airflow.utils.serve_logs import serve_logs # type: ignore[no-redef]
from airflow.utils.serve_logs import serve_logs

sub_proc = None
if skip_serve_logs is False:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import logging
import os
import socket
import sys
from collections import namedtuple

import gunicorn.app.base
Expand All @@ -32,7 +33,6 @@
InvalidIssuedAtError,
InvalidSignatureError,
)
from setproctitle import setproctitle
from werkzeug.exceptions import HTTPException

from airflow.api_fastapi.auth.tokens import JWTValidator, get_signing_key
Expand Down Expand Up @@ -169,6 +169,13 @@ def load(self):

def serve_logs(port=None):
"""Serve logs generated by Worker."""
# setproctitle causes issue on Mac OS: https://github.com/benoitc/gunicorn/issues/3021
os_type = sys.platform
if os_type == "darwin":
logger.debug("Mac OS detected, skipping setproctitle")
else:
from setproctitle import setproctitle

setproctitle("airflow serve-logs")
wsgi_app = create_app()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
from airflow.cli import cli_parser
from airflow.cli.commands import scheduler_command
from airflow.executors import executor_loader
from airflow.providers.fab.www.serve_logs import serve_logs
from airflow.utils.scheduler_health import serve_health_check
from airflow.utils.serve_logs import serve_logs

from tests_common.test_utils.config import conf_vars

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@

from airflow.api_fastapi.auth.tokens import JWTGenerator
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
from airflow.providers.fab.www.serve_logs import create_app
from airflow.utils import timezone
from airflow.utils.serve_logs import create_app

from tests_common.test_utils.config import conf_vars

Expand Down Expand Up @@ -81,7 +81,7 @@ def sample_log(request, tmp_path):
base_log_dir = Path(DEFAULT_LOGGING_CONFIG["handlers"]["task"]["base_log_folder"])
else:
raise ValueError(f"Unknown client fixture: {client}")

base_log_dir.mkdir(exist_ok=True, parents=True)
f = base_log_dir.joinpath("sample.log")
f.write_text(LOG_DATA)
return f
Expand Down
2 changes: 1 addition & 1 deletion dev/breeze/tests/test_selective_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2157,7 +2157,7 @@ def test_upgrade_to_newer_dependencies(
),
pytest.param(
("providers/celery/src/airflow/providers/celery/file.py",),
{"docs-list-as-string": "celery cncf.kubernetes fab"},
{"docs-list-as-string": "celery cncf.kubernetes"},
id="Celery python files changed",
),
pytest.param(
Expand Down
1 change: 0 additions & 1 deletion providers/celery/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ You can install such cross-provider dependencies when installing from PyPI. For
Dependent package Extra
====================================================================================================================== ===================
`apache-airflow-providers-cncf-kubernetes <https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes>`_ ``cncf.kubernetes``
`apache-airflow-providers-fab <https://airflow.apache.org/docs/apache-airflow-providers-fab>`_ ``fab``
====================================================================================================================== ===================

The changelog for the provider package can be found in the
Expand Down
4 changes: 0 additions & 4 deletions providers/celery/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,13 @@ dependencies = [
"cncf.kubernetes" = [
"apache-airflow-providers-cncf-kubernetes>=7.4.0",
]
"fab" = [
"apache-airflow-providers-fab>=2.0.0",
]

[dependency-groups]
dev = [
"apache-airflow",
"apache-airflow-task-sdk",
"apache-airflow-devel-common",
"apache-airflow-providers-cncf-kubernetes",
"apache-airflow-providers-fab",
# Additional devel dependencies (do not remove this line and add extra development dependencies)
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,7 @@ def flower(args):
@contextmanager
def _serve_logs(skip_serve_logs: bool = False):
"""Start serve_logs sub-process."""
if AIRFLOW_V_3_0_PLUS:
try:
from airflow.providers.fab.www.serve_logs import serve_logs
except ImportError:
raise ImportError(
"Celery requires FAB provider to be installed in order to run this command. "
"Please install the FAB provider by running: "
"pip install apache-airflow-providers-celery[fab]"
)
else:
from airflow.utils.serve_logs import serve_logs # type: ignore[no-redef]
from airflow.utils.serve_logs import serve_logs

sub_proc = None
if skip_serve_logs is False:
Expand Down