Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -937,8 +937,9 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any:
raise
finally:
self._clean(event=event, context=context, result=xcom_sidecar_output)
if self.do_xcom_push:
return xcom_sidecar_output

if self.do_xcom_push and xcom_sidecar_output:
context["ti"].xcom_push(XCOM_RETURN_KEY, xcom_sidecar_output)

def _clean(self, event: dict[str, Any], result: dict | None, context: Context) -> None:
if self.pod is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger
from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction, PodLoggingStatus, PodPhase
from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults
from airflow.providers.common.compat.sdk import XCOM_RETURN_KEY
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.types import DagRunType
Expand Down Expand Up @@ -1734,7 +1735,7 @@ def test_xcom_push_failed_pod(self, remote_pod, mock_await, mock_extract_xcom):
with pytest.raises(AirflowException):
k.execute(context=context)

context["ti"].xcom_push.assert_called_with("return_value", {"Test key": "Test value"})
context["ti"].xcom_push.assert_called_with(XCOM_RETURN_KEY, {"Test key": "Test value"})

@pytest.mark.asyncio
@pytest.mark.parametrize(
Expand Down Expand Up @@ -2797,15 +2798,18 @@ def test_async_kpo_wait_termination_before_cleanup_on_success(
}

k = KubernetesPodOperator(task_id="task", deferrable=True, do_xcom_push=do_xcom_push)
result = k.trigger_reentry({}, success_event)
context = create_context(k)
context["ti"].xcom_push = MagicMock()

result = k.trigger_reentry(context, success_event)

# check if it gets the pod
mocked_hook.return_value.get_pod.assert_called_once_with(TEST_NAME, TEST_NAMESPACE)

# assert that the xcom are extracted/not extracted
if do_xcom_push:
mock_extract_xcom.assert_called_once()
assert result == mock_extract_xcom.return_value
context["ti"].xcom_push.assert_called_with(XCOM_RETURN_KEY, mock_extract_xcom.return_value)
else:
mock_extract_xcom.assert_not_called()
assert result is None
Expand Down