diff --git a/providers/standard/src/airflow/providers/standard/operators/python.py b/providers/standard/src/airflow/providers/standard/operators/python.py index 08aa4905770aa..da6214837b9ff 100644 --- a/providers/standard/src/airflow/providers/standard/operators/python.py +++ b/providers/standard/src/airflow/providers/standard/operators/python.py @@ -1076,7 +1076,7 @@ def execute_callable(self): def _iter_serializable_context_keys(self): yield from self.BASE_SERIALIZABLE_CONTEXT_KEYS - if self._get_airflow_version_from_target_env(): + if self.expect_airflow and self._get_airflow_version_from_target_env(): yield from self.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS elif self._is_pendulum_installed_in_target_env(): diff --git a/providers/standard/tests/unit/standard/operators/test_python.py b/providers/standard/tests/unit/standard/operators/test_python.py index 4f48c96affeeb..63bcd03aa313d 100644 --- a/providers/standard/tests/unit/standard/operators/test_python.py +++ b/providers/standard/tests/unit/standard/operators/test_python.py @@ -1781,6 +1781,28 @@ def f(): assert "Something went wrong" in caplog.text assert "returned non-zero exit status" in caplog.text + @mock.patch.object(ExternalPythonOperator, "_get_airflow_version_from_target_env") + def test_iter_serializable_context_keys_respects_expect_airflow_false(self, mock_get_airflow_version): + """Test that when expect_airflow=False, _get_airflow_version_from_target_env is not called.""" + + def f(): + return 42 + + op = ExternalPythonOperator( + python_callable=f, task_id="task", python=sys.executable, expect_airflow=False + ) + + keys = set(op._iter_serializable_context_keys()) + + mock_get_airflow_version.assert_not_called() + + # BASE keys should always be present + base_keys = set(op.BASE_SERIALIZABLE_CONTEXT_KEYS) + airflow_keys = set(op.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS) + + assert base_keys <= keys + assert not (airflow_keys & keys), "Airflow keys should not be present when expect_airflow=False" + class BaseTestBranchPythonVirtualenvOperator(BaseTestPythonVirtualenvOperator): @pytest.fixture(autouse=True)