diff --git a/airflow-core/src/airflow/executors/local_executor.py b/airflow-core/src/airflow/executors/local_executor.py index 6ab00cced3238..2ba7635d306a6 100644 --- a/airflow-core/src/airflow/executors/local_executor.py +++ b/airflow-core/src/airflow/executors/local_executor.py @@ -30,16 +30,23 @@ import multiprocessing import multiprocessing.sharedctypes import os +import sys from multiprocessing import Queue, SimpleQueue from typing import TYPE_CHECKING -from setproctitle import setproctitle - from airflow.executors import workloads from airflow.executors.base_executor import PARALLELISM, BaseExecutor from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.state import TaskInstanceState +# add logger to parameter of setproctitle to support logging +if sys.platform == "darwin": + setproctitle = lambda title, logger: logger.debug("Mac OS detected, skipping setproctitle") +else: + from setproctitle import setproctitle as real_setproctitle + + setproctitle = lambda title, logger: real_setproctitle(title) + if TYPE_CHECKING: from sqlalchemy.orm import Session @@ -61,7 +68,7 @@ def _run_worker( log.info("Worker starting up pid=%d", os.getpid()) while True: - setproctitle("airflow worker -- LocalExecutor: ") + setproctitle("airflow worker -- LocalExecutor: ", log) try: workload = input.get() except EOFError: @@ -107,7 +114,7 @@ def _execute_work(log: logging.Logger, workload: workloads.ExecuteTask) -> None: from airflow.configuration import conf from airflow.sdk.execution_time.supervisor import supervise - setproctitle(f"airflow worker -- LocalExecutor: {workload.ti.id}") + setproctitle(f"airflow worker -- LocalExecutor: {workload.ti.id}", log) base_url = conf.get("api", "base_url", fallback="/") # If it's a relative URL, use localhost:8080 as the default diff --git a/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py b/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py index 320e749863c60..76202dd139f33 100644 --- a/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py +++ b/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py @@ -38,7 +38,6 @@ from celery.backends.base import BaseKeyValueStoreBackend from celery.backends.database import DatabaseBackend, Task as TaskDb, retry, session_cleanup from celery.signals import import_modules as celery_import_modules -from setproctitle import setproctitle from sqlalchemy import select import airflow.settings as settings @@ -58,6 +57,11 @@ log = logging.getLogger(__name__) +if sys.platform == "darwin": + setproctitle = lambda title: log.debug("Mac OS detected, skipping setproctitle") +else: + from setproctitle import setproctitle + if TYPE_CHECKING: from typing import TypeAlias diff --git a/providers/edge3/src/airflow/providers/edge3/cli/worker.py b/providers/edge3/src/airflow/providers/edge3/cli/worker.py index 3d4db279a46a2..a387670ae9932 100644 --- a/providers/edge3/src/airflow/providers/edge3/cli/worker.py +++ b/providers/edge3/src/airflow/providers/edge3/cli/worker.py @@ -19,6 +19,7 @@ import logging import os import signal +import sys from datetime import datetime from http import HTTPStatus from multiprocessing import Process @@ -60,6 +61,11 @@ logger = logging.getLogger(__name__) +if sys.platform == "darwin": + setproctitle = lambda title: logger.debug("Mac OS detected, skipping setproctitle") +else: + from setproctitle import setproctitle + def _edge_hostname() -> str: """Get the hostname of the edge worker that should be reported by tasks.""" @@ -176,8 +182,6 @@ def _launch_job_af3(self, edge_job: EdgeJobFetched) -> tuple[Process, Path]: def _run_job_via_supervisor( workload: ExecuteTask, ) -> int: - from setproctitle import setproctitle - from airflow.sdk.execution_time.supervisor import supervise # Ignore ctrl-c in this process -- we don't want to kill _this_ one. we let tasks run to completion diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py index f450942bbab34..49f3246077453 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py @@ -18,13 +18,13 @@ import logging import os +import sys from concurrent.futures import ProcessPoolExecutor from datetime import datetime from typing import TYPE_CHECKING import psutil from openlineage.client.serde import Serde -from setproctitle import getproctitle, setproctitle from airflow import settings from airflow.listeners import hookimpl @@ -57,6 +57,13 @@ from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance from airflow.settings import Session +if sys.platform == "darwin": + from setproctitle import getproctitle + + setproctitle = lambda title: logging.getLogger(__name__).debug("Mac OS detected, skipping setproctitle") +else: + from setproctitle import getproctitle, setproctitle + _openlineage_listener: OpenLineageListener | None = None