From d5201ac5f818c7448ce2a6f440e020e21a8e0843 Mon Sep 17 00:00:00 2001 From: Daniel Wolf <95075445+wolfdn@users.noreply.github.com> Date: Thu, 26 Jun 2025 14:21:21 +0200 Subject: [PATCH 1/6] Add support for `PackageIndex` connections in `PythonVirtualenvOperator` --- .../providers/standard/operators/python.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/providers/standard/src/airflow/providers/standard/operators/python.py b/providers/standard/src/airflow/providers/standard/operators/python.py index 5a8bd9e87b657..19b58e1497ba9 100644 --- a/providers/standard/src/airflow/providers/standard/operators/python.py +++ b/providers/standard/src/airflow/providers/standard/operators/python.py @@ -50,6 +50,7 @@ ) from airflow.models.baseoperator import BaseOperator from airflow.models.variable import Variable +from airflow.providers.standard.hooks.package_index import PackageIndexHook from airflow.providers.standard.utils.python_virtualenv import prepare_virtualenv, write_python_script from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS from airflow.utils import hashlib_wrapper @@ -656,6 +657,8 @@ class PythonVirtualenvOperator(_BasePythonVirtualenvOperator): exit code will be treated as a failure. :param index_urls: an optional list of index urls to load Python packages from. If not provided the system pip conf will be used to source packages from. + :param index_urls_from_connection_ids: An optional list of Package Index connection IDs. + Will be appended to `index_urls`. :param venv_cache_path: Optional path to the virtual environment parent folder in which the virtual environment will be cached, creates a sub-folder venv-{hash} whereas hash will be replaced with a checksum of requirements. If not provided the virtual environment will be created and deleted @@ -669,7 +672,7 @@ class PythonVirtualenvOperator(_BasePythonVirtualenvOperator): """ template_fields: Sequence[str] = tuple( - {"requirements", "index_urls", "venv_cache_path"}.union(PythonOperator.template_fields) + {"requirements", "index_urls", "index_urls_from_connection_ids", "venv_cache_path"}.union(PythonOperator.template_fields) ) template_ext: Sequence[str] = (".txt",) @@ -690,6 +693,7 @@ def __init__( expect_airflow: bool = True, skip_on_exit_code: int | Container[int] | None = None, index_urls: None | Collection[str] | str = None, + index_urls_from_connection_ids: list[str] | None = None, venv_cache_path: None | os.PathLike[str] = None, env_vars: dict[str, str] | None = None, inherit_env: bool = True, @@ -724,6 +728,7 @@ def __init__( self.index_urls = list(index_urls) else: self.index_urls = None + self.index_urls_from_connection_ids = index_urls_from_connection_ids self.venv_cache_path = venv_cache_path super().__init__( python_callable=python_callable, @@ -850,7 +855,19 @@ def _ensure_venv_cache_exists(self, venv_cache_path: Path) -> Path: self.log.info("New Python virtual environment created in %s", venv_path) return venv_path + def _retrieve_index_urls_from_connection_ids(self): + """Retrieve index URLs from Package Index connections.""" + if self.index_urls is None: + self.index_urls = [] + for conn_id in self.index_urls_from_connection_ids: + conn_url = PackageIndexHook(conn_id).get_connection_url() + log.info("Adding index URL from connection %s: %s", conn_id, conn_url) + self.index_urls.append(PackageIndexHook(conn_id).get_connection_url()) + def execute_callable(self): + if self.index_urls_from_connection_ids: + self._retrieve_index_urls_from_connection_ids() + if self.venv_cache_path: venv_path = self._ensure_venv_cache_exists(Path(self.venv_cache_path)) python_path = venv_path / "bin" / "python" From cdb5c84a9d41bb166dc796d0803907944c28e88e Mon Sep 17 00:00:00 2001 From: Daniel Wolf <95075445+wolfdn@users.noreply.github.com> Date: Thu, 26 Jun 2025 15:27:01 +0200 Subject: [PATCH 2/6] Fix formatting --- .../src/airflow/providers/standard/operators/python.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/providers/standard/src/airflow/providers/standard/operators/python.py b/providers/standard/src/airflow/providers/standard/operators/python.py index 19b58e1497ba9..67003284cccba 100644 --- a/providers/standard/src/airflow/providers/standard/operators/python.py +++ b/providers/standard/src/airflow/providers/standard/operators/python.py @@ -672,7 +672,9 @@ class PythonVirtualenvOperator(_BasePythonVirtualenvOperator): """ template_fields: Sequence[str] = tuple( - {"requirements", "index_urls", "index_urls_from_connection_ids", "venv_cache_path"}.union(PythonOperator.template_fields) + {"requirements", "index_urls", "index_urls_from_connection_ids", "venv_cache_path"}.union( + PythonOperator.template_fields + ) ) template_ext: Sequence[str] = (".txt",) From 79a90302668a88e17f71ba281026e5d2349a384b Mon Sep 17 00:00:00 2001 From: Daniel Wolf <95075445+wolfdn@users.noreply.github.com> Date: Thu, 26 Jun 2025 15:30:45 +0200 Subject: [PATCH 3/6] Cleanup --- .../src/airflow/providers/standard/operators/python.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/providers/standard/src/airflow/providers/standard/operators/python.py b/providers/standard/src/airflow/providers/standard/operators/python.py index 67003284cccba..b4cc3cc9a1526 100644 --- a/providers/standard/src/airflow/providers/standard/operators/python.py +++ b/providers/standard/src/airflow/providers/standard/operators/python.py @@ -657,8 +657,8 @@ class PythonVirtualenvOperator(_BasePythonVirtualenvOperator): exit code will be treated as a failure. :param index_urls: an optional list of index urls to load Python packages from. If not provided the system pip conf will be used to source packages from. - :param index_urls_from_connection_ids: An optional list of Package Index connection IDs. - Will be appended to `index_urls`. + :param index_urls_from_connection_ids: An optional list of ``PackageIndex`` connection IDs. + Will be appended to ``index_urls``. :param venv_cache_path: Optional path to the virtual environment parent folder in which the virtual environment will be cached, creates a sub-folder venv-{hash} whereas hash will be replaced with a checksum of requirements. If not provided the virtual environment will be created and deleted @@ -863,8 +863,7 @@ def _retrieve_index_urls_from_connection_ids(self): self.index_urls = [] for conn_id in self.index_urls_from_connection_ids: conn_url = PackageIndexHook(conn_id).get_connection_url() - log.info("Adding index URL from connection %s: %s", conn_id, conn_url) - self.index_urls.append(PackageIndexHook(conn_id).get_connection_url()) + self.index_urls.append(conn_url) def execute_callable(self): if self.index_urls_from_connection_ids: From f5b04c933906877207bfe1a5a8f3745d785b17f1 Mon Sep 17 00:00:00 2001 From: Daniel Wolf <95075445+wolfdn@users.noreply.github.com> Date: Fri, 27 Jun 2025 08:12:10 +0200 Subject: [PATCH 4/6] Allow `str` or `Collection` for `index_urls_from_connection_ids` --- .../src/airflow/providers/standard/operators/python.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/providers/standard/src/airflow/providers/standard/operators/python.py b/providers/standard/src/airflow/providers/standard/operators/python.py index b4cc3cc9a1526..dfca8d4f42a51 100644 --- a/providers/standard/src/airflow/providers/standard/operators/python.py +++ b/providers/standard/src/airflow/providers/standard/operators/python.py @@ -695,7 +695,7 @@ def __init__( expect_airflow: bool = True, skip_on_exit_code: int | Container[int] | None = None, index_urls: None | Collection[str] | str = None, - index_urls_from_connection_ids: list[str] | None = None, + index_urls_from_connection_ids: None | Collection[str] | str = None, venv_cache_path: None | os.PathLike[str] = None, env_vars: dict[str, str] | None = None, inherit_env: bool = True, @@ -730,7 +730,12 @@ def __init__( self.index_urls = list(index_urls) else: self.index_urls = None - self.index_urls_from_connection_ids = index_urls_from_connection_ids + if isinstance(index_urls_from_connection_ids, str): + self.index_urls_from_connection_ids: list[str] | None = [index_urls_from_connection_ids] + elif isinstance(index_urls_from_connection_ids, Collection): + self.index_urls_from_connection_ids = list(index_urls_from_connection_ids) + else: + self.index_urls_from_connection_ids = None self.venv_cache_path = venv_cache_path super().__init__( python_callable=python_callable, From 79b5d528c096b5ba5f309153ec5548102330e004 Mon Sep 17 00:00:00 2001 From: Daniel Wolf <95075445+wolfdn@users.noreply.github.com> Date: Fri, 27 Jun 2025 08:19:54 +0200 Subject: [PATCH 5/6] Update documentation --- providers/standard/docs/operators/python.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/providers/standard/docs/operators/python.rst b/providers/standard/docs/operators/python.rst index 60e2c83b5f1e7..bfd069cdbadb2 100644 --- a/providers/standard/docs/operators/python.rst +++ b/providers/standard/docs/operators/python.rst @@ -196,6 +196,9 @@ If you want to use additional task specific private python repositories to setup pip install configurations. Passed index urls replace the standard system configured index url settings. To prevent adding secrets to the private repository in your DAG code you can use the Airflow :doc:`apache-airflow:authoring-and-scheduling/connections`. For this purpose the connection type ``Package Index (Python)`` can be used. +In the ``Package Index (Python)`` connection type you can specify the index URL and credentials for the private repository. +After creating a ``Package Index (Python)`` connection, you can provide the connection ID to the ``PythonVirtualenvOperator`` using the ``index_urls_from_connection_ids`` parameter. +The ``PythonVirtualenvOperator`` will automatically append the index URLs from the connection to the ``index_urls`` parameter of the pip installer including the provided credentials. In the special case you want to prevent remote calls for setup of a virtual environment, pass the ``index_urls`` as empty list as ``index_urls=[]`` which forced pip installer to use the ``--no-index`` option. From 6643e8b330c2d8b69eee1d44832f3e2ec3f9fe2d Mon Sep 17 00:00:00 2001 From: Daniel Wolf <95075445+wolfdn@users.noreply.github.com> Date: Fri, 27 Jun 2025 12:45:51 +0200 Subject: [PATCH 6/6] Add unit test --- .../unit/standard/operators/test_python.py | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/providers/standard/tests/unit/standard/operators/test_python.py b/providers/standard/tests/unit/standard/operators/test_python.py index a63bb059e6863..9f42593fcf772 100644 --- a/providers/standard/tests/unit/standard/operators/test_python.py +++ b/providers/standard/tests/unit/standard/operators/test_python.py @@ -48,6 +48,7 @@ DeserializingResultError, ) from airflow.models.baseoperator import BaseOperator +from airflow.models.connection import Connection from airflow.models.dag import DAG from airflow.models.taskinstance import TaskInstance, clear_task_instances, set_current_context from airflow.providers.standard.operators.empty import EmptyOperator @@ -1327,6 +1328,37 @@ def f(a): self.run_as_task(f, index_urls=["https://abc.def.de", "http://xyz.abc.de"], op_args=[4]) + def test_with_index_url_from_connection(self, monkeypatch): + class MockConnection(Connection): + """Mock for the Connection class.""" + + def __init__(self, host: str | None, login: str | None, password: str | None): + super().__init__() + self.host = host + self.login = login + self.password = password + + monkeypatch.setattr( + "airflow.providers.standard.hooks.package_index.PackageIndexHook.get_connection", + lambda *_: MockConnection("https://my.package.index", "my_username", "my_password"), + ) + + def f(a): + import sys + from pathlib import Path + + pip_conf = (Path(sys.executable).parents[1] / "pip.conf").read_text() + assert "abc.def.de" in pip_conf + assert "https://my_username:my_password@my.package.index" in pip_conf + return a + + self.run_as_task( + f, + index_urls=["https://abc.def.de"], + index_urls_from_connection_ids=["my_connection"], + op_args=[4], + ) + def test_caching(self): def f(a): import sys