From e255fb4a3a166d7ddc7c7a9513ae3ca594f59e6a Mon Sep 17 00:00:00 2001 From: HsiuChuanHsu Date: Mon, 4 Aug 2025 22:06:21 +0800 Subject: [PATCH 1/6] feat: Enhanced pod failure logging in KubernetesExecutor Add detailed logging of pod and container failure information when tasks fail, including pod phase, reasons, messages, and container states for better debugging. - Extract pod status (phase, reason, message) on task failure - Extract container state info (terminated/waiting reasons & messages) - Add exception handling for Kubernetes API failures - Only execute additional logging for FAILED task states --- .../test_kubernetes_executor.py | 124 ++++++++++++++++++ .../executors/kubernetes_executor.py | 40 ++++++ 2 files changed, 164 insertions(+) diff --git a/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py b/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py index c7e1322bc7707..12c624ab40792 100644 --- a/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py +++ b/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py @@ -16,6 +16,8 @@ # under the License. from __future__ import annotations +from unittest.mock import Mock, patch + import pytest from kubernetes_tests.test_base import ( @@ -102,3 +104,125 @@ def test_integration_run_dag_with_scheduler_failure(self): ) assert self._num_pods_in_namespace("test-namespace") == 0, "failed to delete pods in other namespace" + + @pytest.mark.execution_timeout(300) + @patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.log") + def test_pod_failure_logging_with_container_terminated(self, mock_log): + """Test that pod failure information is logged when container is terminated.""" + + from airflow.models.taskinstancekey import TaskInstanceKey + from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor + from airflow.utils.state import TaskInstanceState + + # Create a mock KubernetesExecutor instance + executor = KubernetesExecutor() + executor.kube_client = Mock() + executor.kube_scheduler = Mock() + + # Create mock pod status with terminated container + mock_container_state = Mock() + mock_container_state.terminated = Mock() + mock_container_state.terminated.reason = "Error" + mock_container_state.terminated.message = "Container failed with exit code 1" + mock_container_state.waiting = None + + mock_container_status = Mock() + mock_container_status.state = mock_container_state + + mock_pod_status = Mock() + mock_pod_status.phase = "Failed" + mock_pod_status.reason = "PodFailed" + mock_pod_status.message = "Pod execution failed" + mock_pod_status.container_statuses = [mock_container_status] + + mock_pod = Mock() + mock_pod.status = mock_pod_status + + executor.kube_client.read_namespaced_pod.return_value = mock_pod + + # Create a test task key + task_key = TaskInstanceKey(dag_id="test_dag", task_id="test_task", run_id="test_run", try_number=1) + + # Call _change_state with FAILED status + executor._change_state( + key=task_key, state=TaskInstanceState.FAILED, pod_name="test-pod", namespace="test-namespace" + ) + + # Verify that the error log was called with expected parameters + mock_log.error.assert_called_once_with( + "Pod %s in namespace %s failed. Pod phase: %s, reason: %s, message: %s, container_state: %s, container_reason: %s, container_message: %s", + "test-pod", + "test-namespace", + "Failed", + "PodFailed", + "Pod execution failed", + "terminated", + "Error", + "Container failed with exit code 1", + ) + + @pytest.mark.execution_timeout(300) + @patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.log") + def test_pod_failure_logging_exception_handling(self, mock_log): + """Test that exceptions during pod status retrieval are handled gracefully.""" + from kubernetes.client.rest import ApiException + + from airflow.models.taskinstancekey import TaskInstanceKey + from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor + from airflow.utils.state import TaskInstanceState + + # Create a mock KubernetesExecutor instance + executor = KubernetesExecutor() + executor.kube_client = Mock() + executor.kube_scheduler = Mock() + + # Make read_namespaced_pod raise an exception + executor.kube_client.read_namespaced_pod.side_effect = ApiException(status=404, reason="Not Found") + + # Create a test task key + task_key = TaskInstanceKey(dag_id="test_dag", task_id="test_task", run_id="test_run", try_number=1) + + # Call _change_state with FAILED status + executor._change_state( + key=task_key, state=TaskInstanceState.FAILED, pod_name="test-pod", namespace="test-namespace" + ) + + # Verify that the warning log was called with the correct parameters + mock_log.warning.assert_called_once() + call_args = mock_log.warning.call_args[0] + assert call_args[0] == "Failed to fetch pod failure reason for %s/%s: %s" + assert call_args[1] == "test-namespace" + assert call_args[2] == "test-pod" + # The third argument should be the exception + assert isinstance(call_args[3], ApiException) + + # Verify that error log was not called + mock_log.error.assert_not_called() + + @pytest.mark.execution_timeout(300) + @patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.log") + def test_pod_failure_logging_non_failed_state(self, mock_log): + """Test that pod failure logging only occurs for FAILED state.""" + from airflow.models.taskinstancekey import TaskInstanceKey + from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor + from airflow.utils.state import TaskInstanceState + + # Create a mock KubernetesExecutor instance + executor = KubernetesExecutor() + executor.kube_client = Mock() + executor.kube_scheduler = Mock() + + # Create a test task key + task_key = TaskInstanceKey(dag_id="test_dag", task_id="test_task", run_id="test_run", try_number=1) + + # Call _change_state with SUCCESS status + executor._change_state( + key=task_key, state=TaskInstanceState.SUCCESS, pod_name="test-pod", namespace="test-namespace" + ) + + # Verify that no failure logs were called + mock_log.error.assert_not_called() + mock_log.warning.assert_not_called() + + # Verify that kube_client methods were not called + executor.kube_client.read_namespaced_pod.assert_not_called() diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 2f8ff5699e3da..ae970be80b4d9 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -417,6 +417,46 @@ def _change_state( if TYPE_CHECKING: assert self.kube_scheduler + if state == TaskInstanceState.FAILED: + try: + if self.kube_client: + pod = self.kube_client.read_namespaced_pod(name=pod_name, namespace=namespace) + pod_status = getattr(pod.status, "phase", None) + pod_reason = getattr(pod.status, "reason", None) + pod_message = getattr(pod.status, "message", None) + + # If containerStatuses has detailed reasons, print them as well. + container_statuses = getattr(pod.status, "container_statuses", None) + container_state = None + container_reason = None + container_message = None + if container_statuses: + for cs in container_statuses: + state_obj = cs.state + if state_obj.terminated: + container_state = "terminated" + container_reason = getattr(state_obj.terminated, "reason", None) + container_message = getattr(state_obj.terminated, "message", None) + break + if state_obj.waiting: + container_state = "waiting" + container_reason = getattr(state_obj.waiting, "reason", None) + container_message = getattr(state_obj.waiting, "message", None) + break + self.log.error( + "Pod %s in namespace %s failed. Pod phase: %s, reason: %s, message: %s, container_state: %s, container_reason: %s, container_message: %s", + pod_name, + namespace, + pod_status, + pod_reason, + pod_message, + container_state, + container_reason, + container_message, + ) + except Exception as e: + self.log.warning("Failed to fetch pod failure reason for %s/%s: %s", namespace, pod_name, e) + if state == ADOPTED: # When the task pod is adopted by another executor, # then remove the task from the current executor running queue. From 9e9731ba0c10322574271945e290260c5eeb938e Mon Sep 17 00:00:00 2001 From: HsiuChuanHsu Date: Wed, 6 Aug 2025 07:18:38 +0800 Subject: [PATCH 2/6] feat(k8s-executor): improve pod failure diagnostics and reduce API calls Move failure analysis to watcher thread, add detailed container status logging, and include task keys for better log searchability. --- .../executors/kubernetes_executor.py | 77 ++++----- .../executors/kubernetes_executor_types.py | 14 +- .../executors/kubernetes_executor_utils.py | 149 ++++++++++++++++-- 3 files changed, 180 insertions(+), 60 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index ae970be80b4d9..dc80543f50d17 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -330,11 +330,11 @@ def sync(self) -> None: while True: results = self.result_queue.get_nowait() try: - key, state, pod_name, namespace, resource_version = results + key, state, pod_name, namespace, resource_version, failure_details = results last_resource_version[namespace] = resource_version self.log.info("Changing state of %s to %s", results, state) try: - self._change_state(key, state, pod_name, namespace) + self._change_state(key, state, pod_name, namespace, failure_details) except Exception as e: self.log.exception( "Exception: %s when attempting to change state of %s to %s, re-queueing.", @@ -412,50 +412,43 @@ def _change_state( state: TaskInstanceState | str | None, pod_name: str, namespace: str, + failure_details: dict[str, Any] | None = None, session: Session = NEW_SESSION, ) -> None: if TYPE_CHECKING: assert self.kube_scheduler if state == TaskInstanceState.FAILED: - try: - if self.kube_client: - pod = self.kube_client.read_namespaced_pod(name=pod_name, namespace=namespace) - pod_status = getattr(pod.status, "phase", None) - pod_reason = getattr(pod.status, "reason", None) - pod_message = getattr(pod.status, "message", None) - - # If containerStatuses has detailed reasons, print them as well. - container_statuses = getattr(pod.status, "container_statuses", None) - container_state = None - container_reason = None - container_message = None - if container_statuses: - for cs in container_statuses: - state_obj = cs.state - if state_obj.terminated: - container_state = "terminated" - container_reason = getattr(state_obj.terminated, "reason", None) - container_message = getattr(state_obj.terminated, "message", None) - break - if state_obj.waiting: - container_state = "waiting" - container_reason = getattr(state_obj.waiting, "reason", None) - container_message = getattr(state_obj.waiting, "message", None) - break - self.log.error( - "Pod %s in namespace %s failed. Pod phase: %s, reason: %s, message: %s, container_state: %s, container_reason: %s, container_message: %s", - pod_name, - namespace, - pod_status, - pod_reason, - pod_message, - container_state, - container_reason, - container_message, - ) - except Exception as e: - self.log.warning("Failed to fetch pod failure reason for %s/%s: %s", namespace, pod_name, e) + # Use pre-collected failure details from the watcher to avoid additional API calls + if failure_details: + pod_status = failure_details.get("pod_status") + pod_reason = failure_details.get("pod_reason") + pod_message = failure_details.get("pod_message") + container_state = failure_details.get("container_state") + container_reason = failure_details.get("container_reason") + container_message = failure_details.get("container_message") + exit_code = failure_details.get("exit_code") + + task_key_str = f"{key.dag_id}.{key.task_id}.{key.try_number}" + self.log.warning( + "Task %s failed in pod %s/%s. Pod phase: %s, reason: %s, message: %s, " + "container_state: %s, container_reason: %s, container_message: %s, exit_code: %s", + task_key_str, + namespace, + pod_name, + pod_status, + pod_reason, + pod_message, + container_state, + container_reason, + container_message, + exit_code, + ) + else: + task_key_str = f"{key.dag_id}.{key.task_id}.{key.try_number}" + self.log.warning( + "Task %s failed in pod %s/%s (no details available)", task_key_str, namespace, pod_name + ) if state == ADOPTED: # When the task pod is adopted by another executor, @@ -736,12 +729,12 @@ def _flush_result_queue(self) -> None: results = self.result_queue.get_nowait() self.log.warning("Executor shutting down, flushing results=%s", results) try: - key, state, pod_name, namespace, resource_version = results + key, state, pod_name, namespace, resource_version, failure_details = results self.log.info( "Changing state of %s to %s : resource_version=%d", results, state, resource_version ) try: - self._change_state(key, state, pod_name, namespace) + self._change_state(key, state, pod_name, namespace, failure_details) except Exception as e: self.log.exception( "Ignoring exception: %s when attempting to change state of %s to %s.", diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py index 78ab517c52677..14c10273ef6f7 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py @@ -31,11 +31,15 @@ # TaskInstance key, command, configuration, pod_template_file KubernetesJobType = tuple[TaskInstanceKey, CommandType, Any, str | None] - # key, pod state, pod_name, namespace, resource_version - KubernetesResultsType = tuple[TaskInstanceKey, TaskInstanceState | str | None, str, str, str] - - # pod_name, namespace, pod state, annotations, resource_version - KubernetesWatchType = tuple[str, str, TaskInstanceState | str | None, dict[str, str], str] + # key, pod state, pod_name, namespace, resource_version, failure_details + KubernetesResultsType = tuple[ + TaskInstanceKey, TaskInstanceState | str | None, str, str, str, dict[str, Any] | None + ] + + # pod_name, namespace, pod state, annotations, resource_version, failure_details + KubernetesWatchType = tuple[ + str, str, TaskInstanceState | str | None, dict[str, str], str, dict[str, Any] | None + ] ALL_NAMESPACES = "ALL_NAMESPACES" POD_EXECUTOR_DONE_KEY = "airflow_executor_done" diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index 122cfd9f8fab6..9f31bce6f9975 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -210,22 +210,115 @@ def process_status( if POD_REVOKED_KEY in pod.metadata.labels.keys(): return + # Collect failure details for failed pods + failure_details = None + if status == "Failed": + try: + pod_status = getattr(pod.status, "phase", None) + pod_reason = getattr(pod.status, "reason", None) + pod_message = getattr(pod.status, "message", None) + + # Container status analysis - check both init and main containers + container_info = {} + + # Check init containers first (they run before main containers) + init_container_statuses = getattr(pod.status, "init_container_statuses", None) + if init_container_statuses: + for cs in init_container_statuses: + state_obj = cs.state + if state_obj.terminated: + terminated_reason = getattr(state_obj.terminated, "reason", None) + exit_code = getattr(state_obj.terminated, "exit_code", 0) + # Only treat as failure if exit code != 0 AND reason is not "Completed" + if exit_code != 0 and terminated_reason != "Completed": + # Init container failed + container_info = { + "state": "terminated", + "reason": terminated_reason, + "message": getattr(state_obj.terminated, "message", None), + "exit_code": exit_code, + "container_type": "init", + "container_name": getattr(cs, "name", "unknown"), + } + break + elif state_obj.waiting: + container_info = { + "state": "waiting", + "reason": getattr(state_obj.waiting, "reason", None), + "message": getattr(state_obj.waiting, "message", None), + "container_type": "init", + "container_name": getattr(cs, "name", "unknown"), + } + # Continue to look for terminated state in other init containers + + # If no init container failure found, check main containers + if not container_info: + container_statuses = getattr(pod.status, "container_statuses", None) + if container_statuses: + for cs in container_statuses: + state_obj = cs.state + # Prioritize terminated state for final failure details + if state_obj.terminated: + terminated_reason = getattr(state_obj.terminated, "reason", None) + exit_code = getattr(state_obj.terminated, "exit_code", 0) + # Only treat as failure if exit code != 0 AND reason is not "Completed" + if exit_code != 0 and terminated_reason != "Completed": + container_info = { + "state": "terminated", + "reason": terminated_reason, + "message": getattr(state_obj.terminated, "message", None), + "exit_code": exit_code, + "container_type": "main", + "container_name": getattr(cs, "name", "unknown"), + } + break + elif state_obj.waiting: + container_info = { + "state": "waiting", + "reason": getattr(state_obj.waiting, "reason", None), + "message": getattr(state_obj.waiting, "message", None), + "container_type": "main", + "container_name": getattr(cs, "name", "unknown"), + } + # Continue to look for terminated state in other containers + + failure_details = { + "pod_status": pod_status, + "pod_reason": pod_reason, + "pod_message": pod_message, + **container_info, + } + except Exception as e: + self.log.warning( + "Failed to collect pod failure details for %s/%s: %s", namespace, pod_name, e + ) + annotations_string = annotations_for_logging_task_metadata(annotations) if event["type"] == "DELETED" and not pod.metadata.deletion_timestamp: # This will happen only when the task pods are adopted by another executor. # So, there is no change in the pod state. # However, need to free the executor slot from the current executor. self.log.info("Event: pod %s adopted, annotations: %s", pod_name, annotations_string) - self.watcher_queue.put((pod_name, namespace, ADOPTED, annotations, resource_version)) + self.watcher_queue.put((pod_name, namespace, ADOPTED, annotations, resource_version, None)) elif hasattr(pod.status, "reason") and pod.status.reason == "ProviderFailed": # Most likely this happens due to Kubernetes setup (virtual kubelet, virtual nodes, etc.) - self.log.error( - "Event: %s failed to start with reason ProviderFailed, annotations: %s", + key = annotations_to_key(annotations=annotations) + task_key_str = f"{key.dag_id}.{key.task_id}.{key.try_number}" if key else "unknown" + self.log.warning( + "Event: %s failed to start with reason ProviderFailed, task: %s, annotations: %s", pod_name, + task_key_str, annotations_string, ) self.watcher_queue.put( - (pod_name, namespace, TaskInstanceState.FAILED, annotations, resource_version) + ( + pod_name, + namespace, + TaskInstanceState.FAILED, + annotations, + resource_version, + failure_details, + ) ) elif status == "Pending": # deletion_timestamp is set by kube server when a graceful deletion is requested. @@ -254,14 +347,26 @@ def process_status( and container_status_state["waiting"]["message"] == "pull QPS exceeded" ): continue - self.log.error( - "Event: %s has container %s with fatal reason %s", + key = annotations_to_key(annotations=annotations) + task_key_str = ( + f"{key.dag_id}.{key.task_id}.{key.try_number}" if key else "unknown" + ) + self.log.warning( + "Event: %s has container %s with fatal reason %s, task: %s", pod_name, container_status["name"], container_status_state["waiting"]["reason"], + task_key_str, ) self.watcher_queue.put( - (pod_name, namespace, TaskInstanceState.FAILED, annotations, resource_version) + ( + pod_name, + namespace, + TaskInstanceState.FAILED, + annotations, + resource_version, + failure_details, + ) ) break else: @@ -269,13 +374,24 @@ def process_status( else: self.log.debug("Event: %s Pending, annotations: %s", pod_name, annotations_string) elif status == "Failed": - self.log.error("Event: %s Failed, annotations: %s", pod_name, annotations_string) + key = annotations_to_key(annotations=annotations) + task_key_str = f"{key.dag_id}.{key.task_id}.{key.try_number}" if key else "unknown" + self.log.warning( + "Event: %s Failed, task: %s, annotations: %s", pod_name, task_key_str, annotations_string + ) self.watcher_queue.put( - (pod_name, namespace, TaskInstanceState.FAILED, annotations, resource_version) + ( + pod_name, + namespace, + TaskInstanceState.FAILED, + annotations, + resource_version, + failure_details, + ) ) elif status == "Succeeded": self.log.info("Event: %s Succeeded, annotations: %s", pod_name, annotations_string) - self.watcher_queue.put((pod_name, namespace, None, annotations, resource_version)) + self.watcher_queue.put((pod_name, namespace, None, annotations, resource_version, None)) elif status == "Running": # deletion_timestamp is set by kube server when a graceful deletion is requested. # since kube server have received request to delete pod set TI state failed @@ -286,7 +402,14 @@ def process_status( annotations_string, ) self.watcher_queue.put( - (pod_name, namespace, TaskInstanceState.FAILED, annotations, resource_version) + ( + pod_name, + namespace, + TaskInstanceState.FAILED, + annotations, + resource_version, + failure_details, + ) ) else: self.log.info("Event: %s is Running, annotations: %s", pod_name, annotations_string) @@ -504,7 +627,7 @@ def sync(self) -> None: def process_watcher_task(self, task: KubernetesWatchType) -> None: """Process the task by watcher.""" - pod_name, namespace, state, annotations, resource_version = task + pod_name, namespace, state, annotations, resource_version, failure_details = task self.log.debug( "Attempting to finish pod; pod_name: %s; state: %s; annotations: %s", pod_name, @@ -514,7 +637,7 @@ def process_watcher_task(self, task: KubernetesWatchType) -> None: key = annotations_to_key(annotations=annotations) if key: self.log.debug("finishing job %s - %s (%s)", key, state, pod_name) - self.result_queue.put((key, state, pod_name, namespace, resource_version)) + self.result_queue.put((key, state, pod_name, namespace, resource_version, failure_details)) def _flush_watcher_queue(self) -> None: self.log.debug("Executor shutting down, watcher_queue approx. size=%d", self.watcher_queue.qsize()) From f046a2197c4b908b4b7490f321f5129f12a46c9f Mon Sep 17 00:00:00 2001 From: HsiuChuanHsu Date: Mon, 11 Aug 2025 07:32:56 +0800 Subject: [PATCH 3/6] refactor: Extract pod failure analysis to dedicated TypedDict functions - Add FailureDetails TypedDict for type-safe failure information - Extract collect_pod_failure_details() function for cleaner separation of concerns - Create _analyze_init_containers() and _analyze_main_containers() helper functions --- .../executors/kubernetes_executor.py | 10 +- .../executors/kubernetes_executor_types.py | 22 +- .../executors/kubernetes_executor_utils.py | 213 +++++++++++------- 3 files changed, 164 insertions(+), 81 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index dc80543f50d17..dddaf7b00c43e 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -66,6 +66,7 @@ from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import ( ADOPTED, POD_EXECUTOR_DONE_KEY, + FailureDetails, ) from airflow.providers.cncf.kubernetes.kube_config import KubeConfig from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import annotations_to_key @@ -412,7 +413,7 @@ def _change_state( state: TaskInstanceState | str | None, pod_name: str, namespace: str, - failure_details: dict[str, Any] | None = None, + failure_details: FailureDetails | None = None, session: Session = NEW_SESSION, ) -> None: if TYPE_CHECKING: @@ -428,17 +429,22 @@ def _change_state( container_reason = failure_details.get("container_reason") container_message = failure_details.get("container_message") exit_code = failure_details.get("exit_code") + container_type = failure_details.get("container_type") + container_name = failure_details.get("container_name") task_key_str = f"{key.dag_id}.{key.task_id}.{key.try_number}" self.log.warning( "Task %s failed in pod %s/%s. Pod phase: %s, reason: %s, message: %s, " - "container_state: %s, container_reason: %s, container_message: %s, exit_code: %s", + "container_type: %s, container_name: %s, container_state: %s, container_reason: %s, " + "container_message: %s, exit_code: %s", task_key_str, namespace, pod_name, pod_status, pod_reason, pod_message, + container_type, + container_name, container_state, container_reason, container_message, diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py index 14c10273ef6f7..c2f8db91e4aff 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py @@ -16,9 +16,25 @@ # under the License. from __future__ import annotations -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, TypedDict ADOPTED = "adopted" + + +class FailureDetails(TypedDict, total=False): + """Detailed information about pod/container failure.""" + + pod_status: str | None + pod_reason: str | None + pod_message: str | None + container_state: str | None + container_reason: str | None + container_message: str | None + exit_code: int | None + container_type: str | None # "init" or "main" + container_name: str | None + + if TYPE_CHECKING: from collections.abc import Sequence @@ -33,12 +49,12 @@ # key, pod state, pod_name, namespace, resource_version, failure_details KubernetesResultsType = tuple[ - TaskInstanceKey, TaskInstanceState | str | None, str, str, str, dict[str, Any] | None + TaskInstanceKey, TaskInstanceState | str | None, str, str, str, FailureDetails | None ] # pod_name, namespace, pod state, annotations, resource_version, failure_details KubernetesWatchType = tuple[ - str, str, TaskInstanceState | str | None, dict[str, str], str, dict[str, Any] | None + str, str, TaskInstanceState | str | None, dict[str, str], str, FailureDetails | None ] ALL_NAMESPACES = "ALL_NAMESPACES" diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index 9f31bce6f9975..e9105f7efd9e4 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -21,7 +21,7 @@ import multiprocessing import time from queue import Empty, Queue -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, cast from kubernetes import client, watch from kubernetes.client.rest import ApiException @@ -34,6 +34,7 @@ ALL_NAMESPACES, POD_EXECUTOR_DONE_KEY, POD_REVOKED_KEY, + FailureDetails, ) from airflow.providers.cncf.kubernetes.kube_client import get_kube_client from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import ( @@ -210,84 +211,11 @@ def process_status( if POD_REVOKED_KEY in pod.metadata.labels.keys(): return - # Collect failure details for failed pods + # Collect failure details for failed pods using the new function failure_details = None if status == "Failed": try: - pod_status = getattr(pod.status, "phase", None) - pod_reason = getattr(pod.status, "reason", None) - pod_message = getattr(pod.status, "message", None) - - # Container status analysis - check both init and main containers - container_info = {} - - # Check init containers first (they run before main containers) - init_container_statuses = getattr(pod.status, "init_container_statuses", None) - if init_container_statuses: - for cs in init_container_statuses: - state_obj = cs.state - if state_obj.terminated: - terminated_reason = getattr(state_obj.terminated, "reason", None) - exit_code = getattr(state_obj.terminated, "exit_code", 0) - # Only treat as failure if exit code != 0 AND reason is not "Completed" - if exit_code != 0 and terminated_reason != "Completed": - # Init container failed - container_info = { - "state": "terminated", - "reason": terminated_reason, - "message": getattr(state_obj.terminated, "message", None), - "exit_code": exit_code, - "container_type": "init", - "container_name": getattr(cs, "name", "unknown"), - } - break - elif state_obj.waiting: - container_info = { - "state": "waiting", - "reason": getattr(state_obj.waiting, "reason", None), - "message": getattr(state_obj.waiting, "message", None), - "container_type": "init", - "container_name": getattr(cs, "name", "unknown"), - } - # Continue to look for terminated state in other init containers - - # If no init container failure found, check main containers - if not container_info: - container_statuses = getattr(pod.status, "container_statuses", None) - if container_statuses: - for cs in container_statuses: - state_obj = cs.state - # Prioritize terminated state for final failure details - if state_obj.terminated: - terminated_reason = getattr(state_obj.terminated, "reason", None) - exit_code = getattr(state_obj.terminated, "exit_code", 0) - # Only treat as failure if exit code != 0 AND reason is not "Completed" - if exit_code != 0 and terminated_reason != "Completed": - container_info = { - "state": "terminated", - "reason": terminated_reason, - "message": getattr(state_obj.terminated, "message", None), - "exit_code": exit_code, - "container_type": "main", - "container_name": getattr(cs, "name", "unknown"), - } - break - elif state_obj.waiting: - container_info = { - "state": "waiting", - "reason": getattr(state_obj.waiting, "reason", None), - "message": getattr(state_obj.waiting, "message", None), - "container_type": "main", - "container_name": getattr(cs, "name", "unknown"), - } - # Continue to look for terminated state in other containers - - failure_details = { - "pod_status": pod_status, - "pod_reason": pod_reason, - "pod_message": pod_message, - **container_info, - } + failure_details = collect_pod_failure_details(pod) except Exception as e: self.log.warning( "Failed to collect pod failure details for %s/%s: %s", namespace, pod_name, e @@ -425,6 +353,139 @@ def process_status( ) +def collect_pod_failure_details(pod: k8s.V1Pod) -> FailureDetails | None: + """ + Collect detailed failure information from a failed pod. + + Analyzes both init containers and main containers to determine the root cause + of pod failure, prioritizing terminated containers with non-zero exit codes. + + Args: + pod: The Kubernetes V1Pod object to analyze + + Returns: + FailureDetails dict with failure information, or None if no failure details found + """ + if not pod.status or pod.status.phase != "Failed": + return None + + try: + # Basic pod-level information + failure_details: FailureDetails = { + "pod_status": getattr(pod.status, "phase", None), + "pod_reason": getattr(pod.status, "reason", None), + "pod_message": getattr(pod.status, "message", None), + } + + # Check init containers first (they run before main containers) + container_failure = _analyze_init_containers(pod.status) + + # If no init container failure found, check main containers + if not container_failure: + container_failure = _analyze_main_containers(pod.status) + + # Merge container failure details + if container_failure: + failure_details.update(container_failure) + + return failure_details + + except Exception: + # Return basic pod info if container analysis fails + return { + "pod_status": getattr(pod.status, "phase", None), + "pod_reason": getattr(pod.status, "reason", None), + "pod_message": getattr(pod.status, "message", None), + } + + +def _analyze_init_containers(pod_status: k8s.V1PodStatus) -> FailureDetails | None: + """Analyze init container statuses for failure details.""" + init_container_statuses = getattr(pod_status, "init_container_statuses", None) + if not init_container_statuses: + return None + + waiting_info: FailureDetails | None = None + + for cs in init_container_statuses: + state_obj = cs.state + if state_obj.terminated: + terminated_reason = getattr(state_obj.terminated, "reason", None) + exit_code = getattr(state_obj.terminated, "exit_code", 0) + + # Only treat as failure if exit code != 0 AND reason is not "Completed" + if exit_code != 0 and terminated_reason != "Completed": + return cast( + "FailureDetails", + { + "container_state": "terminated", + "container_reason": terminated_reason, + "container_message": getattr(state_obj.terminated, "message", None), + "exit_code": exit_code, + "container_type": "init", + "container_name": getattr(cs, "name", "unknown"), + }, + ) + elif state_obj.waiting: + # Record waiting state but continue looking for terminated containers + waiting_info = cast( + "FailureDetails", + { + "container_state": "waiting", + "container_reason": getattr(state_obj.waiting, "reason", None), + "container_message": getattr(state_obj.waiting, "message", None), + "container_type": "init", + "container_name": getattr(cs, "name", "unknown"), + }, + ) + + # If we only found waiting containers, return the last one + return waiting_info + + +def _analyze_main_containers(pod_status: k8s.V1PodStatus) -> FailureDetails | None: + """Analyze main container statuses for failure details.""" + container_statuses = getattr(pod_status, "container_statuses", None) + if not container_statuses: + return None + + waiting_info: FailureDetails | None = None + + for cs in container_statuses: + state_obj = cs.state + if state_obj.terminated: + terminated_reason = getattr(state_obj.terminated, "reason", None) + exit_code = getattr(state_obj.terminated, "exit_code", 0) + + # Only treat as failure if exit code != 0 AND reason is not "Completed" + if exit_code != 0 and terminated_reason != "Completed": + return cast( + "FailureDetails", + { + "container_state": "terminated", + "container_reason": terminated_reason, + "container_message": getattr(state_obj.terminated, "message", None), + "exit_code": exit_code, + "container_type": "main", + "container_name": getattr(cs, "name", "unknown"), + }, + ) + elif state_obj.waiting: + # Record waiting state but continue looking for terminated containers + waiting_info = cast( + "FailureDetails", + { + "container_state": "waiting", + "container_reason": getattr(state_obj.waiting, "reason", None), + "container_message": getattr(state_obj.waiting, "message", None), + "container_type": "main", + "container_name": getattr(cs, "name", "unknown"), + }, + ) + + return waiting_info + + class AirflowKubernetesScheduler(LoggingMixin): """Airflow Scheduler for Kubernetes.""" From ed8da67707ccb4212aec63b3f353fc44da46599a Mon Sep 17 00:00:00 2001 From: HsiuChuanHsu Date: Mon, 11 Aug 2025 21:15:50 +0800 Subject: [PATCH 4/6] feat(kubernetes): enhance pod failure diagnostics with TypedDict and modular design 1. Introduce FailureDetails TypedDict 2. Implement collect_pod_failure_details Function 3. Enhanced Failure Logging --- .../test_kubernetes_executor.py | 91 +++++++++---------- .../executors/kubernetes_executor_utils.py | 65 ++++--------- .../executors/test_kubernetes_executor.py | 18 ++-- 3 files changed, 75 insertions(+), 99 deletions(-) diff --git a/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py b/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py index 12c624ab40792..4d14c75864df1 100644 --- a/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py +++ b/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py @@ -16,10 +16,14 @@ # under the License. from __future__ import annotations +from typing import TYPE_CHECKING from unittest.mock import Mock, patch import pytest +if TYPE_CHECKING: + from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import FailureDetails + from kubernetes_tests.test_base import ( EXECUTOR, BaseK8STest, # isort:skip (needed to workaround isort bug) @@ -116,88 +120,83 @@ def test_pod_failure_logging_with_container_terminated(self, mock_log): # Create a mock KubernetesExecutor instance executor = KubernetesExecutor() - executor.kube_client = Mock() executor.kube_scheduler = Mock() - # Create mock pod status with terminated container - mock_container_state = Mock() - mock_container_state.terminated = Mock() - mock_container_state.terminated.reason = "Error" - mock_container_state.terminated.message = "Container failed with exit code 1" - mock_container_state.waiting = None - - mock_container_status = Mock() - mock_container_status.state = mock_container_state - - mock_pod_status = Mock() - mock_pod_status.phase = "Failed" - mock_pod_status.reason = "PodFailed" - mock_pod_status.message = "Pod execution failed" - mock_pod_status.container_statuses = [mock_container_status] - - mock_pod = Mock() - mock_pod.status = mock_pod_status - - executor.kube_client.read_namespaced_pod.return_value = mock_pod + # Create test failure details + failure_details: FailureDetails = { + "pod_status": "Failed", + "pod_reason": "PodFailed", + "pod_message": "Pod execution failed", + "container_state": "terminated", + "container_reason": "Error", + "container_message": "Container failed with exit code 1", + "exit_code": 1, + "container_type": "main", + "container_name": "test-container", + } # Create a test task key task_key = TaskInstanceKey(dag_id="test_dag", task_id="test_task", run_id="test_run", try_number=1) - # Call _change_state with FAILED status + # Call _change_state with FAILED status and failure details executor._change_state( - key=task_key, state=TaskInstanceState.FAILED, pod_name="test-pod", namespace="test-namespace" + key=task_key, + state=TaskInstanceState.FAILED, + pod_name="test-pod", + namespace="test-namespace", + failure_details=failure_details, ) - # Verify that the error log was called with expected parameters - mock_log.error.assert_called_once_with( - "Pod %s in namespace %s failed. Pod phase: %s, reason: %s, message: %s, container_state: %s, container_reason: %s, container_message: %s", - "test-pod", + # Verify that the warning log was called with expected parameters + mock_log.warning.assert_called_once_with( + "Task %s failed in pod %s/%s. Pod phase: %s, reason: %s, message: %s, " + "container_type: %s, container_name: %s, container_state: %s, container_reason: %s, " + "container_message: %s, exit_code: %s", + "test_dag.test_task.1", "test-namespace", + "test-pod", "Failed", "PodFailed", "Pod execution failed", + "main", + "test-container", "terminated", "Error", "Container failed with exit code 1", + 1, ) @pytest.mark.execution_timeout(300) @patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.log") def test_pod_failure_logging_exception_handling(self, mock_log): - """Test that exceptions during pod status retrieval are handled gracefully.""" - from kubernetes.client.rest import ApiException - + """Test that failures without details are handled gracefully.""" from airflow.models.taskinstancekey import TaskInstanceKey from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor from airflow.utils.state import TaskInstanceState # Create a mock KubernetesExecutor instance executor = KubernetesExecutor() - executor.kube_client = Mock() executor.kube_scheduler = Mock() - # Make read_namespaced_pod raise an exception - executor.kube_client.read_namespaced_pod.side_effect = ApiException(status=404, reason="Not Found") - # Create a test task key task_key = TaskInstanceKey(dag_id="test_dag", task_id="test_task", run_id="test_run", try_number=1) - # Call _change_state with FAILED status + # Call _change_state with FAILED status but no failure details executor._change_state( - key=task_key, state=TaskInstanceState.FAILED, pod_name="test-pod", namespace="test-namespace" + key=task_key, + state=TaskInstanceState.FAILED, + pod_name="test-pod", + namespace="test-namespace", + failure_details=None, ) # Verify that the warning log was called with the correct parameters - mock_log.warning.assert_called_once() - call_args = mock_log.warning.call_args[0] - assert call_args[0] == "Failed to fetch pod failure reason for %s/%s: %s" - assert call_args[1] == "test-namespace" - assert call_args[2] == "test-pod" - # The third argument should be the exception - assert isinstance(call_args[3], ApiException) - - # Verify that error log was not called - mock_log.error.assert_not_called() + mock_log.warning.assert_called_once_with( + "Task %s failed in pod %s/%s (no details available)", + "test_dag.test_task.1", + "test-namespace", + "test-pod", + ) @pytest.mark.execution_timeout(300) @patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.log") diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index e9105f7efd9e4..cefa5f52bb984 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -391,6 +391,13 @@ def collect_pod_failure_details(pod: k8s.V1Pod) -> FailureDetails | None: return failure_details except Exception: + # Log unexpected exception for debugging + import logging + + logging.getLogger(__name__).exception( + "Unexpected error while collecting pod failure details for pod %s", + getattr(pod.metadata, "name", "unknown"), + ) # Return basic pod info if container analysis fails return { "pod_status": getattr(pod.status, "phase", None), @@ -399,15 +406,14 @@ def collect_pod_failure_details(pod: k8s.V1Pod) -> FailureDetails | None: } -def _analyze_init_containers(pod_status: k8s.V1PodStatus) -> FailureDetails | None: - """Analyze init container statuses for failure details.""" - init_container_statuses = getattr(pod_status, "init_container_statuses", None) - if not init_container_statuses: +def _analyze_containers(container_statuses: list | None, container_type: str) -> FailureDetails | None: + """Analyze container statuses for failure details.""" + if not container_statuses: return None waiting_info: FailureDetails | None = None - for cs in init_container_statuses: + for cs in container_statuses: state_obj = cs.state if state_obj.terminated: terminated_reason = getattr(state_obj.terminated, "reason", None) @@ -422,7 +428,7 @@ def _analyze_init_containers(pod_status: k8s.V1PodStatus) -> FailureDetails | No "container_reason": terminated_reason, "container_message": getattr(state_obj.terminated, "message", None), "exit_code": exit_code, - "container_type": "init", + "container_type": container_type, "container_name": getattr(cs, "name", "unknown"), }, ) @@ -434,7 +440,7 @@ def _analyze_init_containers(pod_status: k8s.V1PodStatus) -> FailureDetails | No "container_state": "waiting", "container_reason": getattr(state_obj.waiting, "reason", None), "container_message": getattr(state_obj.waiting, "message", None), - "container_type": "init", + "container_type": container_type, "container_name": getattr(cs, "name", "unknown"), }, ) @@ -443,47 +449,16 @@ def _analyze_init_containers(pod_status: k8s.V1PodStatus) -> FailureDetails | No return waiting_info +def _analyze_init_containers(pod_status: k8s.V1PodStatus) -> FailureDetails | None: + """Analyze init container statuses for failure details.""" + init_container_statuses = getattr(pod_status, "init_container_statuses", None) + return _analyze_containers(init_container_statuses, "init") + + def _analyze_main_containers(pod_status: k8s.V1PodStatus) -> FailureDetails | None: """Analyze main container statuses for failure details.""" container_statuses = getattr(pod_status, "container_statuses", None) - if not container_statuses: - return None - - waiting_info: FailureDetails | None = None - - for cs in container_statuses: - state_obj = cs.state - if state_obj.terminated: - terminated_reason = getattr(state_obj.terminated, "reason", None) - exit_code = getattr(state_obj.terminated, "exit_code", 0) - - # Only treat as failure if exit code != 0 AND reason is not "Completed" - if exit_code != 0 and terminated_reason != "Completed": - return cast( - "FailureDetails", - { - "container_state": "terminated", - "container_reason": terminated_reason, - "container_message": getattr(state_obj.terminated, "message", None), - "exit_code": exit_code, - "container_type": "main", - "container_name": getattr(cs, "name", "unknown"), - }, - ) - elif state_obj.waiting: - # Record waiting state but continue looking for terminated containers - waiting_info = cast( - "FailureDetails", - { - "container_state": "waiting", - "container_reason": getattr(state_obj.waiting, "reason", None), - "container_message": getattr(state_obj.waiting, "message", None), - "container_type": "main", - "container_name": getattr(cs, "name", "unknown"), - }, - ) - - return waiting_info + return _analyze_containers(container_statuses, "main") class AirflowKubernetesScheduler(LoggingMixin): diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index 31899080ceaf1..12af733611ec4 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -53,7 +53,7 @@ get_logs_task_metadata, ) from airflow.providers.standard.operators.empty import EmptyOperator -from airflow.utils import timezone +from airflow.utils import timezone # type: ignore[attr-defined] from airflow.utils.state import State, TaskInstanceState from tests_common.test_utils.config import conf_vars @@ -675,7 +675,7 @@ def test_change_state_running(self, mock_get_kube_client, mock_kubernetes_job_wa executor = self.kubernetes_executor executor.start() try: - key = ("dag_id", "task_id", "run_id", "try_number1") + key = TaskInstanceKey(dag_id="dag_id", task_id="task_id", run_id="run_id", try_number=1) executor.running = {key} executor._change_state(key, State.RUNNING, "pod_name", "default") assert executor.event_buffer[key][0] == State.RUNNING @@ -693,7 +693,7 @@ def test_change_state_success(self, mock_delete_pod, mock_get_kube_client, mock_ executor = self.kubernetes_executor executor.start() try: - key = ("dag_id", "task_id", "run_id", "try_number2") + key = TaskInstanceKey(dag_id="dag_id", task_id="task_id", run_id="run_id", try_number=2) executor.running = {key} executor._change_state(key, State.SUCCESS, "pod_name", "default") assert executor.event_buffer[key][0] == State.SUCCESS @@ -718,7 +718,7 @@ def test_change_state_failed_no_deletion( executor.kube_config.delete_worker_pods_on_failure = False executor.start() try: - key = ("dag_id", "task_id", "run_id", "try_number3") + key = TaskInstanceKey(dag_id="dag_id", task_id="task_id", run_id="run_id", try_number=3) executor.running = {key} executor._change_state(key, State.FAILED, "pod_id", "test-namespace") assert executor.event_buffer[key][0] == State.FAILED @@ -769,7 +769,7 @@ def test_change_state_adopted(self, mock_delete_pod, mock_get_kube_client, mock_ executor = self.kubernetes_executor executor.start() try: - key = ("dag_id", "task_id", "run_id", "try_number2") + key = TaskInstanceKey(dag_id="dag_id", task_id="task_id", run_id="run_id", try_number=2) executor.running = {key} executor._change_state(key, ADOPTED, "pod_name", "default") assert len(executor.event_buffer) == 0 @@ -785,7 +785,7 @@ def test_change_state_key_not_in_running(self, mock_get_kube_client, mock_kubern executor = self.kubernetes_executor executor.start() try: - key = ("dag_id", "task_id", "run_id", "try_number1") + key = TaskInstanceKey(dag_id="dag_id", task_id="task_id", run_id="run_id", try_number=1) executor.running = set() executor._change_state(key, State.SUCCESS, "pod_name", "default") assert executor.event_buffer.get(key) is None @@ -833,7 +833,7 @@ def test_change_state_skip_pod_deletion( executor.start() try: - key = ("dag_id", "task_id", "run_id", "try_number2") + key = TaskInstanceKey(dag_id="dag_id", task_id="task_id", run_id="run_id", try_number=2) executor.running = {key} executor._change_state(key, State.SUCCESS, "pod_name", "test-namespace") assert executor.event_buffer[key][0] == State.SUCCESS @@ -859,7 +859,7 @@ def test_change_state_failed_pod_deletion( executor.start() try: - key = ("dag_id", "task_id", "run_id", "try_number2") + key = TaskInstanceKey(dag_id="dag_id", task_id="task_id", run_id="run_id", try_number=2) executor.running = {key} executor._change_state(key, State.FAILED, "pod_name", "test-namespace") assert executor.event_buffer[key][0] == State.FAILED @@ -1483,6 +1483,7 @@ def assert_watcher_queue_called_once_with_state(self, state): state, self.core_annotations, self.pod.metadata.resource_version, + mock.ANY, # failure_details can be any value including None ) ) @@ -1719,6 +1720,7 @@ def test_process_status_pod_adopted(self, ti_state): ADOPTED, self.core_annotations, self.pod.metadata.resource_version, + None, # failure_details is None for ADOPTED state ) ) From 71adafc5fe6ac92eb9092cfa34d167d1f64182f7 Mon Sep 17 00:00:00 2001 From: HsiuChuanHsu Date: Tue, 12 Aug 2025 21:44:40 +0800 Subject: [PATCH 5/6] fix(kubernetes): improve type safety with Literal types for container analysis --- .../cncf/kubernetes/executors/kubernetes_executor_types.py | 4 ++-- .../cncf/kubernetes/executors/kubernetes_executor_utils.py | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py index c2f8db91e4aff..a475959e588da 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py @@ -16,7 +16,7 @@ # under the License. from __future__ import annotations -from typing import TYPE_CHECKING, Any, TypedDict +from typing import TYPE_CHECKING, Any, Literal, TypedDict ADOPTED = "adopted" @@ -31,7 +31,7 @@ class FailureDetails(TypedDict, total=False): container_reason: str | None container_message: str | None exit_code: int | None - container_type: str | None # "init" or "main" + container_type: Literal["init", "main"] | None container_name: str | None diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index cefa5f52bb984..7895b081173d9 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -21,7 +21,7 @@ import multiprocessing import time from queue import Empty, Queue -from typing import TYPE_CHECKING, Any, cast +from typing import TYPE_CHECKING, Any, Literal, cast from kubernetes import client, watch from kubernetes.client.rest import ApiException @@ -406,7 +406,9 @@ def collect_pod_failure_details(pod: k8s.V1Pod) -> FailureDetails | None: } -def _analyze_containers(container_statuses: list | None, container_type: str) -> FailureDetails | None: +def _analyze_containers( + container_statuses: list[k8s.V1ContainerStatus] | None, container_type: Literal["init", "main"] +) -> FailureDetails | None: """Analyze container statuses for failure details.""" if not container_statuses: return None From 9fad3aff95be947a1b880eaab013d16652fb43b8 Mon Sep 17 00:00:00 2001 From: HsiuChuanHsu Date: Thu, 21 Aug 2025 07:23:07 +0800 Subject: [PATCH 6/6] feat: enhance pod failure logging in KubernetesExecutor with detailed failure analysis - Pass logger as parameter to collect_pod_failure_details for consistent logging context - Move failure details collection logic to only execute for Failed status pods for better performance - Update test imports to handle timezone module location changes between Airflow versions --- .../executors/kubernetes_executor_utils.py | 31 +++++++++---------- .../executors/test_kubernetes_executor.py | 7 ++++- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index 7895b081173d9..43a03ada7d97e 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -211,16 +211,6 @@ def process_status( if POD_REVOKED_KEY in pod.metadata.labels.keys(): return - # Collect failure details for failed pods using the new function - failure_details = None - if status == "Failed": - try: - failure_details = collect_pod_failure_details(pod) - except Exception as e: - self.log.warning( - "Failed to collect pod failure details for %s/%s: %s", namespace, pod_name, e - ) - annotations_string = annotations_for_logging_task_metadata(annotations) if event["type"] == "DELETED" and not pod.metadata.deletion_timestamp: # This will happen only when the task pods are adopted by another executor. @@ -245,7 +235,7 @@ def process_status( TaskInstanceState.FAILED, annotations, resource_version, - failure_details, + None, ) ) elif status == "Pending": @@ -293,7 +283,7 @@ def process_status( TaskInstanceState.FAILED, annotations, resource_version, - failure_details, + None, ) ) break @@ -302,6 +292,14 @@ def process_status( else: self.log.debug("Event: %s Pending, annotations: %s", pod_name, annotations_string) elif status == "Failed": + # Collect failure details for failed pods + try: + failure_details = collect_pod_failure_details(pod, self.log) + except Exception as e: + self.log.warning( + "Failed to collect pod failure details for %s/%s: %s", namespace, pod_name, e + ) + key = annotations_to_key(annotations=annotations) task_key_str = f"{key.dag_id}.{key.task_id}.{key.try_number}" if key else "unknown" self.log.warning( @@ -336,7 +334,7 @@ def process_status( TaskInstanceState.FAILED, annotations, resource_version, - failure_details, + None, ) ) else: @@ -353,7 +351,7 @@ def process_status( ) -def collect_pod_failure_details(pod: k8s.V1Pod) -> FailureDetails | None: +def collect_pod_failure_details(pod: k8s.V1Pod, logger) -> FailureDetails | None: """ Collect detailed failure information from a failed pod. @@ -362,6 +360,7 @@ def collect_pod_failure_details(pod: k8s.V1Pod) -> FailureDetails | None: Args: pod: The Kubernetes V1Pod object to analyze + logger: Logger instance to use for error logging Returns: FailureDetails dict with failure information, or None if no failure details found @@ -392,9 +391,7 @@ def collect_pod_failure_details(pod: k8s.V1Pod) -> FailureDetails | None: except Exception: # Log unexpected exception for debugging - import logging - - logging.getLogger(__name__).exception( + logger.exception( "Unexpected error while collecting pod failure details for pod %s", getattr(pod.metadata, "name", "unknown"), ) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index 12af733611ec4..c08ed6686222f 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -53,7 +53,12 @@ get_logs_task_metadata, ) from airflow.providers.standard.operators.empty import EmptyOperator -from airflow.utils import timezone # type: ignore[attr-defined] + +try: + from airflow.sdk import timezone +except ImportError: + # Fallback for older Airflow location where timezone is in utils + from airflow.utils import timezone # type: ignore[attr-defined,no-redef] from airflow.utils.state import State, TaskInstanceState from tests_common.test_utils.config import conf_vars