Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions providers/standard/docs/operators/python.rst
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ pip install configurations. Passed index urls replace the standard system config
To prevent adding secrets to the private repository in your DAG code you can use the Airflow
:doc:`apache-airflow:authoring-and-scheduling/connections`. For this purpose the connection type ``Package Index (Python)`` can be used.

The first ``index_url`` in the list will be used as main index URL (``index-url`` for pip or ``default-index`` for uv) of the virtual environment setup.
Additional URLs will be added as extra index URLs. If you provide both parameters ``index_urls`` and ``index_urls_from_connection_ids``, the first URL in
the ``index_urls`` will be used as the main index URL and the rest will be added as extra index URLs.

In the special case you want to prevent remote calls for setup of a virtual environment, pass the ``index_urls`` as empty list as ``index_urls=[]`` which
forced pip installer to use the ``--no-index`` option.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ def _generate_pip_conf(conf_file: Path, index_urls: list[str]) -> None:
conf_file.write_text(f"[global]\n{pip_conf_options}")


def _index_urls_to_uv_env_vars(index_urls: list[str] | None = None) -> dict[str, str]:
uv_index_env_vars = {}
if index_urls:
uv_index_env_vars = {"UV_DEFAULT_INDEX": index_urls[0]}
if len(index_urls) > 1:
uv_index_env_vars["UV_INDEX"] = " ".join(x for x in index_urls[1:])
return uv_index_env_vars


def prepare_virtualenv(
venv_directory: str,
python_bin: str,
Expand Down Expand Up @@ -174,7 +183,7 @@ def prepare_virtualenv(
)

if pip_cmd:
execute_in_subprocess(pip_cmd)
execute_in_subprocess(pip_cmd, env={**os.environ, **_index_urls_to_uv_env_vars(index_urls)})

return f"{venv_directory}/bin/python"

Expand Down
31 changes: 26 additions & 5 deletions providers/standard/tests/unit/standard/operators/test_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
_PythonVersionInfo,
get_current_context,
)
from airflow.providers.standard.utils.python_virtualenv import prepare_virtualenv
from airflow.providers.standard.utils.python_virtualenv import execute_in_subprocess, prepare_virtualenv
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import DagRunState, State, TaskInstanceState
Expand Down Expand Up @@ -1321,17 +1321,38 @@ def f(a):

self.run_as_task(f, system_site_packages=False, op_args=[4])

def test_with_index_urls(self):
@mock.patch(
"airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess",
wraps=execute_in_subprocess,
)
def test_with_index_urls(self, wrapped_execute_in_subprocess):
def f(a):
import sys
from pathlib import Path

pip_conf = (Path(sys.executable).parents[1] / "pip.conf").read_text()
assert "abc.def.de" in pip_conf
assert "xyz.abc.de" in pip_conf
assert "first.package.index" in pip_conf
assert "second.package.index" in pip_conf
assert "third.package.index" in pip_conf
return a

self.run_as_task(f, index_urls=["https://abc.def.de", "http://xyz.abc.de"], op_args=[4])
self.run_as_task(
f,
index_urls=[
"https://first.package.index",
"http://second.package.index",
"http://third.package.index",
],
op_args=[4],
)

# first call creates venv, second call installs packages
package_install_call_args = wrapped_execute_in_subprocess.call_args[1]
assert package_install_call_args["env"]["UV_DEFAULT_INDEX"] == "https://first.package.index"
assert (
package_install_call_args["env"]["UV_INDEX"]
== "http://second.package.index http://third.package.index"
)

def test_caching(self):
def f(a):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ def test_pip_install_options_pip(self, mock_execute_in_subprocess):

assert python_bin == "/VENV/bin/python"
mock_execute_in_subprocess.assert_called_with(
["/VENV/bin/pip", "install", *pip_install_options, "apache-beam[gcp]"]
["/VENV/bin/pip", "install", *pip_install_options, "apache-beam[gcp]"],
env=mock.ANY,
)

@mock.patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess")
Expand All @@ -159,7 +160,16 @@ def test_pip_install_options_uv(self, mock_execute_in_subprocess):

assert python_bin == "/VENV/bin/python"
mock_execute_in_subprocess.assert_called_with(
["uv", "pip", "install", "--python", "/VENV/bin/python", *pip_install_options, "apache-beam[gcp]"]
[
"uv",
"pip",
"install",
"--python",
"/VENV/bin/python",
*pip_install_options,
"apache-beam[gcp]",
],
env=mock.ANY,
)

@mock.patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess")
Expand All @@ -175,7 +185,9 @@ def test_should_create_virtualenv_with_extra_packages_pip(self, mock_execute_in_

mock_execute_in_subprocess.assert_any_call(["pythonVER", "-m", "venv", "/VENV"])

mock_execute_in_subprocess.assert_called_with(["/VENV/bin/pip", "install", "apache-beam[gcp]"])
mock_execute_in_subprocess.assert_called_with(
["/VENV/bin/pip", "install", "apache-beam[gcp]"], env=mock.ANY
)

@mock.patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess")
@conf_vars({("standard", "venv_install_method"): "uv"})
Expand All @@ -189,7 +201,8 @@ def test_should_create_virtualenv_with_extra_packages_uv(self, mock_execute_in_s
assert python_bin == "/VENV/bin/python"

mock_execute_in_subprocess.assert_called_with(
["uv", "pip", "install", "--python", "/VENV/bin/python", "apache-beam[gcp]"]
["uv", "pip", "install", "--python", "/VENV/bin/python", "apache-beam[gcp]"],
env=mock.ANY,
)

@pytest.mark.parametrize(
Expand Down