From d895e7eb25fc1e8b26d47beed106d81c6bb07820 Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Wed, 3 Dec 2025 14:02:59 +0100 Subject: [PATCH] Fix finally return handling --- .../airflow/providers/cncf/kubernetes/operators/pod.py | 5 +++-- .../tests/unit/cncf/kubernetes/operators/test_pod.py | 10 +++++++--- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py index 08182a32d5dbe..41a331cd24cd6 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py @@ -937,8 +937,9 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any: raise finally: self._clean(event=event, context=context, result=xcom_sidecar_output) - if self.do_xcom_push: - return xcom_sidecar_output + + if self.do_xcom_push and xcom_sidecar_output: + context["ti"].xcom_push(XCOM_RETURN_KEY, xcom_sidecar_output) def _clean(self, event: dict[str, Any], result: dict | None, context: Context) -> None: if self.pod is None: diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py index 91069375b8d87..e5e19da79ff0d 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py @@ -44,6 +44,7 @@ from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction, PodLoggingStatus, PodPhase from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults +from airflow.providers.common.compat.sdk import XCOM_RETURN_KEY from airflow.utils import timezone from airflow.utils.session import create_session from airflow.utils.types import DagRunType @@ -1734,7 +1735,7 @@ def test_xcom_push_failed_pod(self, remote_pod, mock_await, mock_extract_xcom): with pytest.raises(AirflowException): k.execute(context=context) - context["ti"].xcom_push.assert_called_with("return_value", {"Test key": "Test value"}) + context["ti"].xcom_push.assert_called_with(XCOM_RETURN_KEY, {"Test key": "Test value"}) @pytest.mark.asyncio @pytest.mark.parametrize( @@ -2797,7 +2798,10 @@ def test_async_kpo_wait_termination_before_cleanup_on_success( } k = KubernetesPodOperator(task_id="task", deferrable=True, do_xcom_push=do_xcom_push) - result = k.trigger_reentry({}, success_event) + context = create_context(k) + context["ti"].xcom_push = MagicMock() + + result = k.trigger_reentry(context, success_event) # check if it gets the pod mocked_hook.return_value.get_pod.assert_called_once_with(TEST_NAME, TEST_NAMESPACE) @@ -2805,7 +2809,7 @@ def test_async_kpo_wait_termination_before_cleanup_on_success( # assert that the xcom are extracted/not extracted if do_xcom_push: mock_extract_xcom.assert_called_once() - assert result == mock_extract_xcom.return_value + context["ti"].xcom_push.assert_called_with(XCOM_RETURN_KEY, mock_extract_xcom.return_value) else: mock_extract_xcom.assert_not_called() assert result is None