Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,15 @@
from airflow.models.variable import Variable
from airflow.providers.common.compat.sdk import context_merge
from airflow.providers.standard.hooks.package_index import PackageIndexHook
from airflow.providers.standard.utils.python_virtualenv import prepare_virtualenv, write_python_script
from airflow.providers.standard.utils.python_virtualenv import (
_execute_in_subprocess,
prepare_virtualenv,
write_python_script,
)
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, BaseOperator
from airflow.utils import hashlib_wrapper
from airflow.utils.file import get_unique_dag_module_name
from airflow.utils.operator_helpers import KeywordParameters
from airflow.utils.process_utils import execute_in_subprocess

if AIRFLOW_V_3_0_PLUS:
from airflow.providers.standard.operators.branch import BaseBranchOperator
Expand Down Expand Up @@ -572,7 +575,7 @@ def _execute_python_callable_in_subprocess(self, python_path: Path):
os.fspath(termination_log_path),
os.fspath(airflow_context_path),
]
execute_in_subprocess(
_execute_in_subprocess(
cmd=cmd,
env=env_vars,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@

from __future__ import annotations

import logging
import os
import shlex
import shutil
import subprocess
import warnings
from pathlib import Path

import jinja2
from jinja2 import select_autoescape

from airflow.configuration import conf
from airflow.utils.process_utils import execute_in_subprocess


def _is_uv_installed() -> bool:
Expand Down Expand Up @@ -132,6 +134,37 @@ def _index_urls_to_uv_env_vars(index_urls: list[str] | None = None) -> dict[str,
return uv_index_env_vars


def _execute_in_subprocess(cmd: list[str], cwd: str | None = None, env: dict[str, str] | None = None) -> None:
"""
Execute a process and stream output to logger.

:param cmd: command and arguments to run
:param cwd: Current working directory passed to the Popen constructor
:param env: Additional environment variables to set for the subprocess.
"""
log = logging.getLogger(__name__)

log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd))
with subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=0,
close_fds=True,
cwd=cwd,
env=env,
) as proc:
log.info("Output:")
if proc.stdout:
with proc.stdout:
for line in iter(proc.stdout.readline, b""):
log.info("%s", line.decode().rstrip())

exit_code = proc.wait()
if exit_code != 0:
raise subprocess.CalledProcessError(exit_code, cmd)


def prepare_virtualenv(
venv_directory: str,
python_bin: str,
Expand Down Expand Up @@ -169,7 +202,7 @@ def prepare_virtualenv(
venv_cmd = _generate_uv_cmd(venv_directory, python_bin, system_site_packages)
else:
venv_cmd = _generate_venv_cmd(venv_directory, python_bin, system_site_packages)
execute_in_subprocess(venv_cmd)
_execute_in_subprocess(venv_cmd)

pip_cmd = None
if requirements is not None and len(requirements) != 0:
Expand All @@ -188,7 +221,7 @@ def prepare_virtualenv(
)

if pip_cmd:
execute_in_subprocess(pip_cmd, env={**os.environ, **_index_urls_to_uv_env_vars(index_urls)})
_execute_in_subprocess(pip_cmd, env={**os.environ, **_index_urls_to_uv_env_vars(index_urls)})

return f"{venv_directory}/bin/python"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
_PythonVersionInfo,
get_current_context,
)
from airflow.providers.standard.utils.python_virtualenv import execute_in_subprocess, prepare_virtualenv
from airflow.providers.standard.utils.python_virtualenv import _execute_in_subprocess, prepare_virtualenv
from airflow.utils.session import create_session
from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.types import NOTSET, DagRunType
Expand Down Expand Up @@ -1410,8 +1410,8 @@ def f(a):
self.run_as_task(f, system_site_packages=False, op_args=[4])

@mock.patch(
"airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess",
wraps=execute_in_subprocess,
"airflow.providers.standard.utils.python_virtualenv._execute_in_subprocess",
wraps=_execute_in_subprocess,
)
def test_with_index_urls(self, wrapped_execute_in_subprocess):
def f(a):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def test_generate_pip_conf(
for term in unexpected_pip_conf_content:
assert term not in generated_conf

@mock.patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess")
@mock.patch("airflow.providers.standard.utils.python_virtualenv._execute_in_subprocess")
@conf_vars({("standard", "venv_install_method"): "pip"})
def test_should_create_virtualenv_pip(self, mock_execute_in_subprocess):
python_bin = prepare_virtualenv(
Expand All @@ -86,7 +86,7 @@ def test_should_create_virtualenv_pip(self, mock_execute_in_subprocess):
assert python_bin == "/VENV/bin/python"
mock_execute_in_subprocess.assert_called_once_with(["pythonVER", "-m", "venv", "/VENV"])

@mock.patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess")
@mock.patch("airflow.providers.standard.utils.python_virtualenv._execute_in_subprocess")
@conf_vars({("standard", "venv_install_method"): "uv"})
def test_should_create_virtualenv_uv(self, mock_execute_in_subprocess):
python_bin = prepare_virtualenv(
Expand All @@ -97,7 +97,7 @@ def test_should_create_virtualenv_uv(self, mock_execute_in_subprocess):
["uv", "venv", "--allow-existing", "--seed", "--python", "pythonVER", "/VENV"]
)

@mock.patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess")
@mock.patch("airflow.providers.standard.utils.python_virtualenv._execute_in_subprocess")
@conf_vars({("standard", "venv_install_method"): "pip"})
def test_should_create_virtualenv_with_system_packages_pip(self, mock_execute_in_subprocess):
python_bin = prepare_virtualenv(
Expand All @@ -108,7 +108,7 @@ def test_should_create_virtualenv_with_system_packages_pip(self, mock_execute_in
["pythonVER", "-m", "venv", "/VENV", "--system-site-packages"]
)

@mock.patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess")
@mock.patch("airflow.providers.standard.utils.python_virtualenv._execute_in_subprocess")
@conf_vars({("standard", "venv_install_method"): "uv"})
def test_should_create_virtualenv_with_system_packages_uv(self, mock_execute_in_subprocess):
python_bin = prepare_virtualenv(
Expand All @@ -128,7 +128,7 @@ def test_should_create_virtualenv_with_system_packages_uv(self, mock_execute_in_
]
)

@mock.patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess")
@mock.patch("airflow.providers.standard.utils.python_virtualenv._execute_in_subprocess")
@conf_vars({("standard", "venv_install_method"): "pip"})
def test_pip_install_options_pip(self, mock_execute_in_subprocess):
pip_install_options = ["--no-deps"]
Expand All @@ -146,7 +146,7 @@ def test_pip_install_options_pip(self, mock_execute_in_subprocess):
env=mock.ANY,
)

@mock.patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess")
@mock.patch("airflow.providers.standard.utils.python_virtualenv._execute_in_subprocess")
@conf_vars({("standard", "venv_install_method"): "uv"})
def test_pip_install_options_uv(self, mock_execute_in_subprocess):
pip_install_options = ["--no-deps"]
Expand All @@ -172,7 +172,7 @@ def test_pip_install_options_uv(self, mock_execute_in_subprocess):
env=mock.ANY,
)

@mock.patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess")
@mock.patch("airflow.providers.standard.utils.python_virtualenv._execute_in_subprocess")
@conf_vars({("standard", "venv_install_method"): "pip"})
def test_should_create_virtualenv_with_extra_packages_pip(self, mock_execute_in_subprocess):
python_bin = prepare_virtualenv(
Expand All @@ -189,7 +189,7 @@ def test_should_create_virtualenv_with_extra_packages_pip(self, mock_execute_in_
["/VENV/bin/pip", "install", "apache-beam[gcp]"], env=mock.ANY
)

@mock.patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess")
@mock.patch("airflow.providers.standard.utils.python_virtualenv._execute_in_subprocess")
@conf_vars({("standard", "venv_install_method"): "uv"})
def test_should_create_virtualenv_with_extra_packages_uv(self, mock_execute_in_subprocess):
python_bin = prepare_virtualenv(
Expand Down
Loading