Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions airflow-core/src/airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,23 @@
import multiprocessing
import multiprocessing.sharedctypes
import os
import sys
from multiprocessing import Queue, SimpleQueue
from typing import TYPE_CHECKING

from setproctitle import setproctitle

from airflow.executors import workloads
from airflow.executors.base_executor import PARALLELISM, BaseExecutor
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import TaskInstanceState

# add logger to parameter of setproctitle to support logging
if sys.platform == "darwin":
setproctitle = lambda title, logger: logger.debug("Mac OS detected, skipping setproctitle")
else:
from setproctitle import setproctitle as real_setproctitle

setproctitle = lambda title, logger: real_setproctitle(title)

if TYPE_CHECKING:
from sqlalchemy.orm import Session

Expand All @@ -61,7 +68,7 @@ def _run_worker(
log.info("Worker starting up pid=%d", os.getpid())

while True:
setproctitle("airflow worker -- LocalExecutor: <idle>")
setproctitle("airflow worker -- LocalExecutor: <idle>", log)
try:
workload = input.get()
except EOFError:
Expand Down Expand Up @@ -107,7 +114,7 @@ def _execute_work(log: logging.Logger, workload: workloads.ExecuteTask) -> None:
from airflow.configuration import conf
from airflow.sdk.execution_time.supervisor import supervise

setproctitle(f"airflow worker -- LocalExecutor: {workload.ti.id}")
setproctitle(f"airflow worker -- LocalExecutor: {workload.ti.id}", log)

base_url = conf.get("api", "base_url", fallback="/")
# If it's a relative URL, use localhost:8080 as the default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from celery.backends.base import BaseKeyValueStoreBackend
from celery.backends.database import DatabaseBackend, Task as TaskDb, retry, session_cleanup
from celery.signals import import_modules as celery_import_modules
from setproctitle import setproctitle
from sqlalchemy import select

import airflow.settings as settings
Expand All @@ -58,6 +57,11 @@

log = logging.getLogger(__name__)

if sys.platform == "darwin":
setproctitle = lambda title: log.debug("Mac OS detected, skipping setproctitle")
else:
from setproctitle import setproctitle

if TYPE_CHECKING:
from typing import TypeAlias

Expand Down
8 changes: 6 additions & 2 deletions providers/edge3/src/airflow/providers/edge3/cli/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import logging
import os
import signal
import sys
from datetime import datetime
from http import HTTPStatus
from multiprocessing import Process
Expand Down Expand Up @@ -60,6 +61,11 @@

logger = logging.getLogger(__name__)

if sys.platform == "darwin":
setproctitle = lambda title: logger.debug("Mac OS detected, skipping setproctitle")
else:
from setproctitle import setproctitle


def _edge_hostname() -> str:
"""Get the hostname of the edge worker that should be reported by tasks."""
Expand Down Expand Up @@ -176,8 +182,6 @@ def _launch_job_af3(self, edge_job: EdgeJobFetched) -> tuple[Process, Path]:
def _run_job_via_supervisor(
workload: ExecuteTask,
) -> int:
from setproctitle import setproctitle

from airflow.sdk.execution_time.supervisor import supervise

# Ignore ctrl-c in this process -- we don't want to kill _this_ one. we let tasks run to completion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@

import logging
import os
import sys
from concurrent.futures import ProcessPoolExecutor
from datetime import datetime
from typing import TYPE_CHECKING

import psutil
from openlineage.client.serde import Serde
from setproctitle import getproctitle, setproctitle

from airflow import settings
from airflow.listeners import hookimpl
Expand Down Expand Up @@ -57,6 +57,13 @@
from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance
from airflow.settings import Session

if sys.platform == "darwin":
from setproctitle import getproctitle

setproctitle = lambda title: logging.getLogger(__name__).debug("Mac OS detected, skipping setproctitle")
else:
from setproctitle import getproctitle, setproctitle

_openlineage_listener: OpenLineageListener | None = None


Expand Down
Loading