diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 76ee44ae9c7da..66cd4eefbe947 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -822,6 +822,10 @@ def await_xcom_sidecar_container_start( if self.container_is_running(pod, PodDefaults.SIDECAR_CONTAINER_NAME): self.log.info("The xcom sidecar container has started.") break + if self.container_is_terminated(pod, PodDefaults.SIDECAR_CONTAINER_NAME): + raise AirflowException( + "Xcom sidecar container is already terminated! Not possible to read xcom output of task." + ) if (time.time() - last_log_time) >= log_interval: self.log.warning( "Still waiting for the xcom sidecar container to start. Elapsed time: %d seconds.", diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py index c682310fba1cf..0ffbbd709bf75 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py @@ -646,13 +646,35 @@ def test_extract_xcom_none(self, mock_exec_xcom_kill, mock_kubernetes_stream): self.pod_manager.extract_xcom(pod=mock_pod) assert mock_exec_xcom_kill.call_count == 1 + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_terminated") @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running") - def test_await_xcom_sidecar_container_timeout(self, mock_container_is_running): + def test_await_xcom_sidecar_container_timeout( + self, mock_container_is_running, mock_container_is_terminated + ): mock_pod = MagicMock() mock_container_is_running.return_value = False + mock_container_is_terminated.return_value = False with pytest.raises(AirflowException): self.pod_manager.await_xcom_sidecar_container_start(pod=mock_pod, timeout=10, log_interval=5) mock_container_is_running.assert_any_call(mock_pod, "airflow-xcom-sidecar") + mock_container_is_terminated.assert_any_call(mock_pod, "airflow-xcom-sidecar") + + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_terminated") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running") + def test_await_xcom_sidecar_container_terminated( + self, mock_container_is_running, mock_container_is_terminated + ): + mock_pod = MagicMock() + mock_container_is_running.return_value = False + mock_container_is_terminated.return_value = True + match_error = ( + "Xcom sidecar container is already terminated! Not possible to read xcom output of task." + ) + with pytest.raises(AirflowException, match=match_error): + self.pod_manager.await_xcom_sidecar_container_start(pod=mock_pod, timeout=10, log_interval=5) + + mock_container_is_running.assert_any_call(mock_pod, "airflow-xcom-sidecar") + mock_container_is_terminated.assert_any_call(mock_pod, "airflow-xcom-sidecar") @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running") def test_await_xcom_sidecar_container_starts(self, mock_container_is_running):