diff --git a/providers/standard/src/airflow/providers/standard/operators/python.py b/providers/standard/src/airflow/providers/standard/operators/python.py index 3d8902dcfc0d1..37aa79c33433e 100644 --- a/providers/standard/src/airflow/providers/standard/operators/python.py +++ b/providers/standard/src/airflow/providers/standard/operators/python.py @@ -21,6 +21,7 @@ import json import logging import os +import re import shutil import subprocess import sys @@ -35,6 +36,9 @@ from typing import TYPE_CHECKING, Any, Callable, NamedTuple, cast import lazy_object_proxy +from packaging.requirements import InvalidRequirement, Requirement +from packaging.specifiers import InvalidSpecifier +from packaging.version import InvalidVersion from airflow.exceptions import ( AirflowConfigException, @@ -848,10 +852,38 @@ def execute_callable(self): def _iter_serializable_context_keys(self): yield from self.BASE_SERIALIZABLE_CONTEXT_KEYS - if self.system_site_packages or "apache-airflow" in self.requirements: + + found_airflow = found_pendulum = False + + if self.system_site_packages: + # If we're using system packages, assume both are present + found_airflow = found_pendulum = True + else: + for raw_str in self.requirements: + line = raw_str.strip() + # Skip blank lines and full‐line comments + if not line or line.startswith("#"): + continue + + # Strip off any inline comment + # e.g. turn "foo==1.2.3 # comment" → "foo==1.2.3" + req_str = re.sub(r"#.*$", "", line).strip() + + try: + req = Requirement(req_str) + except (InvalidRequirement, InvalidSpecifier, InvalidVersion) as e: + raise ValueError(f"Invalid requirement '{raw_str}': {e}") from e + + if req.name == "apache-airflow": + found_airflow = found_pendulum = True + break + elif req.name == "pendulum": + found_pendulum = True + + if found_airflow: yield from self.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS - elif "pendulum" in self.requirements: + elif found_pendulum: yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS diff --git a/providers/standard/tests/unit/standard/operators/test_python.py b/providers/standard/tests/unit/standard/operators/test_python.py index d30938f4f00bf..e8a0091481647 100644 --- a/providers/standard/tests/unit/standard/operators/test_python.py +++ b/providers/standard/tests/unit/standard/operators/test_python.py @@ -1451,6 +1451,99 @@ def f( self.run_as_task(f, serializer=serializer, system_site_packages=False, requirements=None) + @pytest.mark.parametrize( + "requirements, system_site, want_airflow, want_pendulum", + [ + # nothing → just base keys + ([], False, False, False), + # site-packages → base keys + pendulum keys + ([], True, True, True), + # apache-airflow / no version constraint + (["apache-airflow"], False, True, True), + # specific version + (["apache-airflow==2.10.2"], False, True, True), + # minimum version + (["apache-airflow>=2.10"], False, True, True), + # pendulum / no version constraint + (["pendulum"], False, False, True), + # compatible release + (["pendulum~=2.1.0"], False, False, True), + # other package + (["foo==1.0.0"], False, False, False), + # with other package + (["apache-airflow", "foo"], False, True, True), + # full-line comment only + (["# comment"], False, False, False), + # inline comment after requirement + (["apache-airflow==2.10.2 # comment"], False, True, True), + # blank line + requirement + (["", "pendulum"], False, False, True), + # indented comment + requirement + ([" # comment", "pendulum~=2.1.0"], False, False, True), + ], + ) + def test_iter_serializable_context_keys(self, requirements, system_site, want_airflow, want_pendulum): + def func(): + return "test_return_value" + + op = PythonVirtualenvOperator( + task_id="task", + python_callable=func, + requirements=requirements, + system_site_packages=system_site, + ) + keys = set(op._iter_serializable_context_keys()) + + base_keys = set(op.BASE_SERIALIZABLE_CONTEXT_KEYS) + airflow_keys = set(op.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS) + pendulum_keys = set(op.PENDULUM_SERIALIZABLE_CONTEXT_KEYS) + + # BASE keys always present + assert base_keys <= keys + + # AIRFLOW keys only when expected + if want_airflow: + assert airflow_keys <= keys, f"expected AIRFLOW keys for requirements: {requirements}" + else: + assert not (airflow_keys & keys), f"unexpected AIRFLOW keys for requirements: {requirements}" + + # PENDULUM keys only when expected + if want_pendulum: + assert pendulum_keys <= keys, f"expected PENDULUM keys for requirements: {requirements}" + else: + assert not (pendulum_keys & keys), f"unexpected PENDULUM keys for requirements: {requirements}" + + @pytest.mark.parametrize( + "invalid_requirement", + [ + # invalid version format + "pendulum==3..0", + # invalid operator (=< instead of <=) + "apache-airflow=<2.0", + # same invalid operator on pendulum + "pendulum=<3.0", + # totally malformed + "invalid requirement", + ], + ) + def test_iter_serializable_context_keys_invalid_requirement(self, invalid_requirement): + def func(): + return "test_return_value" + + op = PythonVirtualenvOperator( + task_id="task", + python_callable=func, + requirements=[invalid_requirement], + system_site_packages=False, + ) + + with pytest.raises(ValueError) as exc_info: + # Consume the generator to trigger parsing + list(op._iter_serializable_context_keys()) + + msg = str(exc_info.value) + assert f"Invalid requirement '{invalid_requirement}'" in msg + # when venv tests are run in parallel to other test they create new processes and this might take # quite some time in shared docker environment and get some contention even between different containers