diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index 693932f022a6..49469aa5007d 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -680,6 +680,10 @@ def execute_complete(self, context: Context, event: dict, **kwargs): message = f"{event['message']}\n{event['stack_trace']}" else: message = event["message"] + if self.do_xcom_push: + # In the event of base container failure, we need to kill the xcom sidecar. + # We disregard xcom output and do that here + _ = self.extract_xcom(pod=pod) raise AirflowException(message) elif event["status"] == "success": # fetch some logs when pod is executed successfully diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py b/tests/providers/cncf/kubernetes/operators/test_pod.py index b896f57ee5d0..fa617a39fd05 100644 --- a/tests/providers/cncf/kubernetes/operators/test_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_pod.py @@ -2049,8 +2049,13 @@ def test_async_kpo_wait_termination_before_cleanup_on_failure( # assert that it does not push the xcom ti_mock.xcom_push.assert_not_called() - # assert that the xcom are not extracted - mock_extract_xcom.assert_not_called() + if do_xcom_push: + # assert that the xcom are not extracted if do_xcom_push is Fale + mock_extract_xcom.assert_called_once() + else: + # but that it is extracted when do_xcom_push is true because the sidecare + # needs to be terminated + mock_extract_xcom.assert_not_called() # check if it waits for the pod to complete assert read_pod_mock.call_count == 3