diff --git a/providers/standard/src/airflow/providers/standard/operators/python.py b/providers/standard/src/airflow/providers/standard/operators/python.py index 8436592e38864..160617c2b789b 100644 --- a/providers/standard/src/airflow/providers/standard/operators/python.py +++ b/providers/standard/src/airflow/providers/standard/operators/python.py @@ -51,12 +51,15 @@ from airflow.models.variable import Variable from airflow.providers.common.compat.sdk import context_merge from airflow.providers.standard.hooks.package_index import PackageIndexHook -from airflow.providers.standard.utils.python_virtualenv import prepare_virtualenv, write_python_script +from airflow.providers.standard.utils.python_virtualenv import ( + _execute_in_subprocess, + prepare_virtualenv, + write_python_script, +) from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, BaseOperator from airflow.utils import hashlib_wrapper from airflow.utils.file import get_unique_dag_module_name from airflow.utils.operator_helpers import KeywordParameters -from airflow.utils.process_utils import execute_in_subprocess if AIRFLOW_V_3_0_PLUS: from airflow.providers.standard.operators.branch import BaseBranchOperator @@ -572,7 +575,7 @@ def _execute_python_callable_in_subprocess(self, python_path: Path): os.fspath(termination_log_path), os.fspath(airflow_context_path), ] - execute_in_subprocess( + _execute_in_subprocess( cmd=cmd, env=env_vars, ) diff --git a/providers/standard/src/airflow/providers/standard/utils/python_virtualenv.py b/providers/standard/src/airflow/providers/standard/utils/python_virtualenv.py index d89d0ed89717a..ee71f33a56056 100644 --- a/providers/standard/src/airflow/providers/standard/utils/python_virtualenv.py +++ b/providers/standard/src/airflow/providers/standard/utils/python_virtualenv.py @@ -19,8 +19,11 @@ from __future__ import annotations +import logging import os +import shlex import shutil +import subprocess import warnings from pathlib import Path @@ -28,7 +31,6 @@ from jinja2 import select_autoescape from airflow.configuration import conf -from airflow.utils.process_utils import execute_in_subprocess def _is_uv_installed() -> bool: @@ -132,6 +134,37 @@ def _index_urls_to_uv_env_vars(index_urls: list[str] | None = None) -> dict[str, return uv_index_env_vars +def _execute_in_subprocess(cmd: list[str], cwd: str | None = None, env: dict[str, str] | None = None) -> None: + """ + Execute a process and stream output to logger. + + :param cmd: command and arguments to run + :param cwd: Current working directory passed to the Popen constructor + :param env: Additional environment variables to set for the subprocess. + """ + log = logging.getLogger(__name__) + + log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd)) + with subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + bufsize=0, + close_fds=True, + cwd=cwd, + env=env, + ) as proc: + log.info("Output:") + if proc.stdout: + with proc.stdout: + for line in iter(proc.stdout.readline, b""): + log.info("%s", line.decode().rstrip()) + + exit_code = proc.wait() + if exit_code != 0: + raise subprocess.CalledProcessError(exit_code, cmd) + + def prepare_virtualenv( venv_directory: str, python_bin: str, @@ -169,7 +202,7 @@ def prepare_virtualenv( venv_cmd = _generate_uv_cmd(venv_directory, python_bin, system_site_packages) else: venv_cmd = _generate_venv_cmd(venv_directory, python_bin, system_site_packages) - execute_in_subprocess(venv_cmd) + _execute_in_subprocess(venv_cmd) pip_cmd = None if requirements is not None and len(requirements) != 0: @@ -188,7 +221,7 @@ def prepare_virtualenv( ) if pip_cmd: - execute_in_subprocess(pip_cmd, env={**os.environ, **_index_urls_to_uv_env_vars(index_urls)}) + _execute_in_subprocess(pip_cmd, env={**os.environ, **_index_urls_to_uv_env_vars(index_urls)}) return f"{venv_directory}/bin/python" diff --git a/providers/standard/tests/unit/standard/operators/test_python.py b/providers/standard/tests/unit/standard/operators/test_python.py index 197a1b3141422..fe33cad1ce0c5 100644 --- a/providers/standard/tests/unit/standard/operators/test_python.py +++ b/providers/standard/tests/unit/standard/operators/test_python.py @@ -61,7 +61,7 @@ _PythonVersionInfo, get_current_context, ) -from airflow.providers.standard.utils.python_virtualenv import execute_in_subprocess, prepare_virtualenv +from airflow.providers.standard.utils.python_virtualenv import _execute_in_subprocess, prepare_virtualenv from airflow.utils.session import create_session from airflow.utils.state import DagRunState, State, TaskInstanceState from airflow.utils.types import NOTSET, DagRunType @@ -1410,8 +1410,8 @@ def f(a): self.run_as_task(f, system_site_packages=False, op_args=[4]) @mock.patch( - "airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess", - wraps=execute_in_subprocess, + "airflow.providers.standard.utils.python_virtualenv._execute_in_subprocess", + wraps=_execute_in_subprocess, ) def test_with_index_urls(self, wrapped_execute_in_subprocess): def f(a): diff --git a/providers/standard/tests/unit/standard/utils/test_python_virtualenv.py b/providers/standard/tests/unit/standard/utils/test_python_virtualenv.py index 1f28cfd8293c0..fca9c3da16294 100644 --- a/providers/standard/tests/unit/standard/utils/test_python_virtualenv.py +++ b/providers/standard/tests/unit/standard/utils/test_python_virtualenv.py @@ -77,7 +77,7 @@ def test_generate_pip_conf( for term in unexpected_pip_conf_content: assert term not in generated_conf - @mock.patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess") + @mock.patch("airflow.providers.standard.utils.python_virtualenv._execute_in_subprocess") @conf_vars({("standard", "venv_install_method"): "pip"}) def test_should_create_virtualenv_pip(self, mock_execute_in_subprocess): python_bin = prepare_virtualenv( @@ -86,7 +86,7 @@ def test_should_create_virtualenv_pip(self, mock_execute_in_subprocess): assert python_bin == "/VENV/bin/python" mock_execute_in_subprocess.assert_called_once_with(["pythonVER", "-m", "venv", "/VENV"]) - @mock.patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess") + @mock.patch("airflow.providers.standard.utils.python_virtualenv._execute_in_subprocess") @conf_vars({("standard", "venv_install_method"): "uv"}) def test_should_create_virtualenv_uv(self, mock_execute_in_subprocess): python_bin = prepare_virtualenv( @@ -97,7 +97,7 @@ def test_should_create_virtualenv_uv(self, mock_execute_in_subprocess): ["uv", "venv", "--allow-existing", "--seed", "--python", "pythonVER", "/VENV"] ) - @mock.patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess") + @mock.patch("airflow.providers.standard.utils.python_virtualenv._execute_in_subprocess") @conf_vars({("standard", "venv_install_method"): "pip"}) def test_should_create_virtualenv_with_system_packages_pip(self, mock_execute_in_subprocess): python_bin = prepare_virtualenv( @@ -108,7 +108,7 @@ def test_should_create_virtualenv_with_system_packages_pip(self, mock_execute_in ["pythonVER", "-m", "venv", "/VENV", "--system-site-packages"] ) - @mock.patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess") + @mock.patch("airflow.providers.standard.utils.python_virtualenv._execute_in_subprocess") @conf_vars({("standard", "venv_install_method"): "uv"}) def test_should_create_virtualenv_with_system_packages_uv(self, mock_execute_in_subprocess): python_bin = prepare_virtualenv( @@ -128,7 +128,7 @@ def test_should_create_virtualenv_with_system_packages_uv(self, mock_execute_in_ ] ) - @mock.patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess") + @mock.patch("airflow.providers.standard.utils.python_virtualenv._execute_in_subprocess") @conf_vars({("standard", "venv_install_method"): "pip"}) def test_pip_install_options_pip(self, mock_execute_in_subprocess): pip_install_options = ["--no-deps"] @@ -146,7 +146,7 @@ def test_pip_install_options_pip(self, mock_execute_in_subprocess): env=mock.ANY, ) - @mock.patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess") + @mock.patch("airflow.providers.standard.utils.python_virtualenv._execute_in_subprocess") @conf_vars({("standard", "venv_install_method"): "uv"}) def test_pip_install_options_uv(self, mock_execute_in_subprocess): pip_install_options = ["--no-deps"] @@ -172,7 +172,7 @@ def test_pip_install_options_uv(self, mock_execute_in_subprocess): env=mock.ANY, ) - @mock.patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess") + @mock.patch("airflow.providers.standard.utils.python_virtualenv._execute_in_subprocess") @conf_vars({("standard", "venv_install_method"): "pip"}) def test_should_create_virtualenv_with_extra_packages_pip(self, mock_execute_in_subprocess): python_bin = prepare_virtualenv( @@ -189,7 +189,7 @@ def test_should_create_virtualenv_with_extra_packages_pip(self, mock_execute_in_ ["/VENV/bin/pip", "install", "apache-beam[gcp]"], env=mock.ANY ) - @mock.patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess") + @mock.patch("airflow.providers.standard.utils.python_virtualenv._execute_in_subprocess") @conf_vars({("standard", "venv_install_method"): "uv"}) def test_should_create_virtualenv_with_extra_packages_uv(self, mock_execute_in_subprocess): python_bin = prepare_virtualenv(