diff --git a/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py b/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py index c7e1322bc7707..4d14c75864df1 100644 --- a/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py +++ b/kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py @@ -16,8 +16,14 @@ # under the License. from __future__ import annotations +from typing import TYPE_CHECKING +from unittest.mock import Mock, patch + import pytest +if TYPE_CHECKING: + from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import FailureDetails + from kubernetes_tests.test_base import ( EXECUTOR, BaseK8STest, # isort:skip (needed to workaround isort bug) @@ -102,3 +108,120 @@ def test_integration_run_dag_with_scheduler_failure(self): ) assert self._num_pods_in_namespace("test-namespace") == 0, "failed to delete pods in other namespace" + + @pytest.mark.execution_timeout(300) + @patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.log") + def test_pod_failure_logging_with_container_terminated(self, mock_log): + """Test that pod failure information is logged when container is terminated.""" + + from airflow.models.taskinstancekey import TaskInstanceKey + from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor + from airflow.utils.state import TaskInstanceState + + # Create a mock KubernetesExecutor instance + executor = KubernetesExecutor() + executor.kube_scheduler = Mock() + + # Create test failure details + failure_details: FailureDetails = { + "pod_status": "Failed", + "pod_reason": "PodFailed", + "pod_message": "Pod execution failed", + "container_state": "terminated", + "container_reason": "Error", + "container_message": "Container failed with exit code 1", + "exit_code": 1, + "container_type": "main", + "container_name": "test-container", + } + + # Create a test task key + task_key = TaskInstanceKey(dag_id="test_dag", task_id="test_task", run_id="test_run", try_number=1) + + # Call _change_state with FAILED status and failure details + executor._change_state( + key=task_key, + state=TaskInstanceState.FAILED, + pod_name="test-pod", + namespace="test-namespace", + failure_details=failure_details, + ) + + # Verify that the warning log was called with expected parameters + mock_log.warning.assert_called_once_with( + "Task %s failed in pod %s/%s. Pod phase: %s, reason: %s, message: %s, " + "container_type: %s, container_name: %s, container_state: %s, container_reason: %s, " + "container_message: %s, exit_code: %s", + "test_dag.test_task.1", + "test-namespace", + "test-pod", + "Failed", + "PodFailed", + "Pod execution failed", + "main", + "test-container", + "terminated", + "Error", + "Container failed with exit code 1", + 1, + ) + + @pytest.mark.execution_timeout(300) + @patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.log") + def test_pod_failure_logging_exception_handling(self, mock_log): + """Test that failures without details are handled gracefully.""" + from airflow.models.taskinstancekey import TaskInstanceKey + from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor + from airflow.utils.state import TaskInstanceState + + # Create a mock KubernetesExecutor instance + executor = KubernetesExecutor() + executor.kube_scheduler = Mock() + + # Create a test task key + task_key = TaskInstanceKey(dag_id="test_dag", task_id="test_task", run_id="test_run", try_number=1) + + # Call _change_state with FAILED status but no failure details + executor._change_state( + key=task_key, + state=TaskInstanceState.FAILED, + pod_name="test-pod", + namespace="test-namespace", + failure_details=None, + ) + + # Verify that the warning log was called with the correct parameters + mock_log.warning.assert_called_once_with( + "Task %s failed in pod %s/%s (no details available)", + "test_dag.test_task.1", + "test-namespace", + "test-pod", + ) + + @pytest.mark.execution_timeout(300) + @patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.log") + def test_pod_failure_logging_non_failed_state(self, mock_log): + """Test that pod failure logging only occurs for FAILED state.""" + from airflow.models.taskinstancekey import TaskInstanceKey + from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor + from airflow.utils.state import TaskInstanceState + + # Create a mock KubernetesExecutor instance + executor = KubernetesExecutor() + executor.kube_client = Mock() + executor.kube_scheduler = Mock() + + # Create a test task key + task_key = TaskInstanceKey(dag_id="test_dag", task_id="test_task", run_id="test_run", try_number=1) + + # Call _change_state with SUCCESS status + executor._change_state( + key=task_key, state=TaskInstanceState.SUCCESS, pod_name="test-pod", namespace="test-namespace" + ) + + # Verify that no failure logs were called + mock_log.error.assert_not_called() + mock_log.warning.assert_not_called() + + # Verify that kube_client methods were not called + executor.kube_client.read_namespaced_pod.assert_not_called() diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 2f8ff5699e3da..dddaf7b00c43e 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -66,6 +66,7 @@ from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import ( ADOPTED, POD_EXECUTOR_DONE_KEY, + FailureDetails, ) from airflow.providers.cncf.kubernetes.kube_config import KubeConfig from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import annotations_to_key @@ -330,11 +331,11 @@ def sync(self) -> None: while True: results = self.result_queue.get_nowait() try: - key, state, pod_name, namespace, resource_version = results + key, state, pod_name, namespace, resource_version, failure_details = results last_resource_version[namespace] = resource_version self.log.info("Changing state of %s to %s", results, state) try: - self._change_state(key, state, pod_name, namespace) + self._change_state(key, state, pod_name, namespace, failure_details) except Exception as e: self.log.exception( "Exception: %s when attempting to change state of %s to %s, re-queueing.", @@ -412,11 +413,49 @@ def _change_state( state: TaskInstanceState | str | None, pod_name: str, namespace: str, + failure_details: FailureDetails | None = None, session: Session = NEW_SESSION, ) -> None: if TYPE_CHECKING: assert self.kube_scheduler + if state == TaskInstanceState.FAILED: + # Use pre-collected failure details from the watcher to avoid additional API calls + if failure_details: + pod_status = failure_details.get("pod_status") + pod_reason = failure_details.get("pod_reason") + pod_message = failure_details.get("pod_message") + container_state = failure_details.get("container_state") + container_reason = failure_details.get("container_reason") + container_message = failure_details.get("container_message") + exit_code = failure_details.get("exit_code") + container_type = failure_details.get("container_type") + container_name = failure_details.get("container_name") + + task_key_str = f"{key.dag_id}.{key.task_id}.{key.try_number}" + self.log.warning( + "Task %s failed in pod %s/%s. Pod phase: %s, reason: %s, message: %s, " + "container_type: %s, container_name: %s, container_state: %s, container_reason: %s, " + "container_message: %s, exit_code: %s", + task_key_str, + namespace, + pod_name, + pod_status, + pod_reason, + pod_message, + container_type, + container_name, + container_state, + container_reason, + container_message, + exit_code, + ) + else: + task_key_str = f"{key.dag_id}.{key.task_id}.{key.try_number}" + self.log.warning( + "Task %s failed in pod %s/%s (no details available)", task_key_str, namespace, pod_name + ) + if state == ADOPTED: # When the task pod is adopted by another executor, # then remove the task from the current executor running queue. @@ -696,12 +735,12 @@ def _flush_result_queue(self) -> None: results = self.result_queue.get_nowait() self.log.warning("Executor shutting down, flushing results=%s", results) try: - key, state, pod_name, namespace, resource_version = results + key, state, pod_name, namespace, resource_version, failure_details = results self.log.info( "Changing state of %s to %s : resource_version=%d", results, state, resource_version ) try: - self._change_state(key, state, pod_name, namespace) + self._change_state(key, state, pod_name, namespace, failure_details) except Exception as e: self.log.exception( "Ignoring exception: %s when attempting to change state of %s to %s.", diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py index 78ab517c52677..a475959e588da 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py @@ -16,9 +16,25 @@ # under the License. from __future__ import annotations -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Literal, TypedDict ADOPTED = "adopted" + + +class FailureDetails(TypedDict, total=False): + """Detailed information about pod/container failure.""" + + pod_status: str | None + pod_reason: str | None + pod_message: str | None + container_state: str | None + container_reason: str | None + container_message: str | None + exit_code: int | None + container_type: Literal["init", "main"] | None + container_name: str | None + + if TYPE_CHECKING: from collections.abc import Sequence @@ -31,11 +47,15 @@ # TaskInstance key, command, configuration, pod_template_file KubernetesJobType = tuple[TaskInstanceKey, CommandType, Any, str | None] - # key, pod state, pod_name, namespace, resource_version - KubernetesResultsType = tuple[TaskInstanceKey, TaskInstanceState | str | None, str, str, str] + # key, pod state, pod_name, namespace, resource_version, failure_details + KubernetesResultsType = tuple[ + TaskInstanceKey, TaskInstanceState | str | None, str, str, str, FailureDetails | None + ] - # pod_name, namespace, pod state, annotations, resource_version - KubernetesWatchType = tuple[str, str, TaskInstanceState | str | None, dict[str, str], str] + # pod_name, namespace, pod state, annotations, resource_version, failure_details + KubernetesWatchType = tuple[ + str, str, TaskInstanceState | str | None, dict[str, str], str, FailureDetails | None + ] ALL_NAMESPACES = "ALL_NAMESPACES" POD_EXECUTOR_DONE_KEY = "airflow_executor_done" diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index 122cfd9f8fab6..43a03ada7d97e 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -21,7 +21,7 @@ import multiprocessing import time from queue import Empty, Queue -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Literal, cast from kubernetes import client, watch from kubernetes.client.rest import ApiException @@ -34,6 +34,7 @@ ALL_NAMESPACES, POD_EXECUTOR_DONE_KEY, POD_REVOKED_KEY, + FailureDetails, ) from airflow.providers.cncf.kubernetes.kube_client import get_kube_client from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import ( @@ -216,16 +217,26 @@ def process_status( # So, there is no change in the pod state. # However, need to free the executor slot from the current executor. self.log.info("Event: pod %s adopted, annotations: %s", pod_name, annotations_string) - self.watcher_queue.put((pod_name, namespace, ADOPTED, annotations, resource_version)) + self.watcher_queue.put((pod_name, namespace, ADOPTED, annotations, resource_version, None)) elif hasattr(pod.status, "reason") and pod.status.reason == "ProviderFailed": # Most likely this happens due to Kubernetes setup (virtual kubelet, virtual nodes, etc.) - self.log.error( - "Event: %s failed to start with reason ProviderFailed, annotations: %s", + key = annotations_to_key(annotations=annotations) + task_key_str = f"{key.dag_id}.{key.task_id}.{key.try_number}" if key else "unknown" + self.log.warning( + "Event: %s failed to start with reason ProviderFailed, task: %s, annotations: %s", pod_name, + task_key_str, annotations_string, ) self.watcher_queue.put( - (pod_name, namespace, TaskInstanceState.FAILED, annotations, resource_version) + ( + pod_name, + namespace, + TaskInstanceState.FAILED, + annotations, + resource_version, + None, + ) ) elif status == "Pending": # deletion_timestamp is set by kube server when a graceful deletion is requested. @@ -254,14 +265,26 @@ def process_status( and container_status_state["waiting"]["message"] == "pull QPS exceeded" ): continue - self.log.error( - "Event: %s has container %s with fatal reason %s", + key = annotations_to_key(annotations=annotations) + task_key_str = ( + f"{key.dag_id}.{key.task_id}.{key.try_number}" if key else "unknown" + ) + self.log.warning( + "Event: %s has container %s with fatal reason %s, task: %s", pod_name, container_status["name"], container_status_state["waiting"]["reason"], + task_key_str, ) self.watcher_queue.put( - (pod_name, namespace, TaskInstanceState.FAILED, annotations, resource_version) + ( + pod_name, + namespace, + TaskInstanceState.FAILED, + annotations, + resource_version, + None, + ) ) break else: @@ -269,13 +292,32 @@ def process_status( else: self.log.debug("Event: %s Pending, annotations: %s", pod_name, annotations_string) elif status == "Failed": - self.log.error("Event: %s Failed, annotations: %s", pod_name, annotations_string) + # Collect failure details for failed pods + try: + failure_details = collect_pod_failure_details(pod, self.log) + except Exception as e: + self.log.warning( + "Failed to collect pod failure details for %s/%s: %s", namespace, pod_name, e + ) + + key = annotations_to_key(annotations=annotations) + task_key_str = f"{key.dag_id}.{key.task_id}.{key.try_number}" if key else "unknown" + self.log.warning( + "Event: %s Failed, task: %s, annotations: %s", pod_name, task_key_str, annotations_string + ) self.watcher_queue.put( - (pod_name, namespace, TaskInstanceState.FAILED, annotations, resource_version) + ( + pod_name, + namespace, + TaskInstanceState.FAILED, + annotations, + resource_version, + failure_details, + ) ) elif status == "Succeeded": self.log.info("Event: %s Succeeded, annotations: %s", pod_name, annotations_string) - self.watcher_queue.put((pod_name, namespace, None, annotations, resource_version)) + self.watcher_queue.put((pod_name, namespace, None, annotations, resource_version, None)) elif status == "Running": # deletion_timestamp is set by kube server when a graceful deletion is requested. # since kube server have received request to delete pod set TI state failed @@ -286,7 +328,14 @@ def process_status( annotations_string, ) self.watcher_queue.put( - (pod_name, namespace, TaskInstanceState.FAILED, annotations, resource_version) + ( + pod_name, + namespace, + TaskInstanceState.FAILED, + annotations, + resource_version, + None, + ) ) else: self.log.info("Event: %s is Running, annotations: %s", pod_name, annotations_string) @@ -302,6 +351,115 @@ def process_status( ) +def collect_pod_failure_details(pod: k8s.V1Pod, logger) -> FailureDetails | None: + """ + Collect detailed failure information from a failed pod. + + Analyzes both init containers and main containers to determine the root cause + of pod failure, prioritizing terminated containers with non-zero exit codes. + + Args: + pod: The Kubernetes V1Pod object to analyze + logger: Logger instance to use for error logging + + Returns: + FailureDetails dict with failure information, or None if no failure details found + """ + if not pod.status or pod.status.phase != "Failed": + return None + + try: + # Basic pod-level information + failure_details: FailureDetails = { + "pod_status": getattr(pod.status, "phase", None), + "pod_reason": getattr(pod.status, "reason", None), + "pod_message": getattr(pod.status, "message", None), + } + + # Check init containers first (they run before main containers) + container_failure = _analyze_init_containers(pod.status) + + # If no init container failure found, check main containers + if not container_failure: + container_failure = _analyze_main_containers(pod.status) + + # Merge container failure details + if container_failure: + failure_details.update(container_failure) + + return failure_details + + except Exception: + # Log unexpected exception for debugging + logger.exception( + "Unexpected error while collecting pod failure details for pod %s", + getattr(pod.metadata, "name", "unknown"), + ) + # Return basic pod info if container analysis fails + return { + "pod_status": getattr(pod.status, "phase", None), + "pod_reason": getattr(pod.status, "reason", None), + "pod_message": getattr(pod.status, "message", None), + } + + +def _analyze_containers( + container_statuses: list[k8s.V1ContainerStatus] | None, container_type: Literal["init", "main"] +) -> FailureDetails | None: + """Analyze container statuses for failure details.""" + if not container_statuses: + return None + + waiting_info: FailureDetails | None = None + + for cs in container_statuses: + state_obj = cs.state + if state_obj.terminated: + terminated_reason = getattr(state_obj.terminated, "reason", None) + exit_code = getattr(state_obj.terminated, "exit_code", 0) + + # Only treat as failure if exit code != 0 AND reason is not "Completed" + if exit_code != 0 and terminated_reason != "Completed": + return cast( + "FailureDetails", + { + "container_state": "terminated", + "container_reason": terminated_reason, + "container_message": getattr(state_obj.terminated, "message", None), + "exit_code": exit_code, + "container_type": container_type, + "container_name": getattr(cs, "name", "unknown"), + }, + ) + elif state_obj.waiting: + # Record waiting state but continue looking for terminated containers + waiting_info = cast( + "FailureDetails", + { + "container_state": "waiting", + "container_reason": getattr(state_obj.waiting, "reason", None), + "container_message": getattr(state_obj.waiting, "message", None), + "container_type": container_type, + "container_name": getattr(cs, "name", "unknown"), + }, + ) + + # If we only found waiting containers, return the last one + return waiting_info + + +def _analyze_init_containers(pod_status: k8s.V1PodStatus) -> FailureDetails | None: + """Analyze init container statuses for failure details.""" + init_container_statuses = getattr(pod_status, "init_container_statuses", None) + return _analyze_containers(init_container_statuses, "init") + + +def _analyze_main_containers(pod_status: k8s.V1PodStatus) -> FailureDetails | None: + """Analyze main container statuses for failure details.""" + container_statuses = getattr(pod_status, "container_statuses", None) + return _analyze_containers(container_statuses, "main") + + class AirflowKubernetesScheduler(LoggingMixin): """Airflow Scheduler for Kubernetes.""" @@ -504,7 +662,7 @@ def sync(self) -> None: def process_watcher_task(self, task: KubernetesWatchType) -> None: """Process the task by watcher.""" - pod_name, namespace, state, annotations, resource_version = task + pod_name, namespace, state, annotations, resource_version, failure_details = task self.log.debug( "Attempting to finish pod; pod_name: %s; state: %s; annotations: %s", pod_name, @@ -514,7 +672,7 @@ def process_watcher_task(self, task: KubernetesWatchType) -> None: key = annotations_to_key(annotations=annotations) if key: self.log.debug("finishing job %s - %s (%s)", key, state, pod_name) - self.result_queue.put((key, state, pod_name, namespace, resource_version)) + self.result_queue.put((key, state, pod_name, namespace, resource_version, failure_details)) def _flush_watcher_queue(self) -> None: self.log.debug("Executor shutting down, watcher_queue approx. size=%d", self.watcher_queue.qsize()) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index 31899080ceaf1..c08ed6686222f 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -53,7 +53,12 @@ get_logs_task_metadata, ) from airflow.providers.standard.operators.empty import EmptyOperator -from airflow.utils import timezone + +try: + from airflow.sdk import timezone +except ImportError: + # Fallback for older Airflow location where timezone is in utils + from airflow.utils import timezone # type: ignore[attr-defined,no-redef] from airflow.utils.state import State, TaskInstanceState from tests_common.test_utils.config import conf_vars @@ -675,7 +680,7 @@ def test_change_state_running(self, mock_get_kube_client, mock_kubernetes_job_wa executor = self.kubernetes_executor executor.start() try: - key = ("dag_id", "task_id", "run_id", "try_number1") + key = TaskInstanceKey(dag_id="dag_id", task_id="task_id", run_id="run_id", try_number=1) executor.running = {key} executor._change_state(key, State.RUNNING, "pod_name", "default") assert executor.event_buffer[key][0] == State.RUNNING @@ -693,7 +698,7 @@ def test_change_state_success(self, mock_delete_pod, mock_get_kube_client, mock_ executor = self.kubernetes_executor executor.start() try: - key = ("dag_id", "task_id", "run_id", "try_number2") + key = TaskInstanceKey(dag_id="dag_id", task_id="task_id", run_id="run_id", try_number=2) executor.running = {key} executor._change_state(key, State.SUCCESS, "pod_name", "default") assert executor.event_buffer[key][0] == State.SUCCESS @@ -718,7 +723,7 @@ def test_change_state_failed_no_deletion( executor.kube_config.delete_worker_pods_on_failure = False executor.start() try: - key = ("dag_id", "task_id", "run_id", "try_number3") + key = TaskInstanceKey(dag_id="dag_id", task_id="task_id", run_id="run_id", try_number=3) executor.running = {key} executor._change_state(key, State.FAILED, "pod_id", "test-namespace") assert executor.event_buffer[key][0] == State.FAILED @@ -769,7 +774,7 @@ def test_change_state_adopted(self, mock_delete_pod, mock_get_kube_client, mock_ executor = self.kubernetes_executor executor.start() try: - key = ("dag_id", "task_id", "run_id", "try_number2") + key = TaskInstanceKey(dag_id="dag_id", task_id="task_id", run_id="run_id", try_number=2) executor.running = {key} executor._change_state(key, ADOPTED, "pod_name", "default") assert len(executor.event_buffer) == 0 @@ -785,7 +790,7 @@ def test_change_state_key_not_in_running(self, mock_get_kube_client, mock_kubern executor = self.kubernetes_executor executor.start() try: - key = ("dag_id", "task_id", "run_id", "try_number1") + key = TaskInstanceKey(dag_id="dag_id", task_id="task_id", run_id="run_id", try_number=1) executor.running = set() executor._change_state(key, State.SUCCESS, "pod_name", "default") assert executor.event_buffer.get(key) is None @@ -833,7 +838,7 @@ def test_change_state_skip_pod_deletion( executor.start() try: - key = ("dag_id", "task_id", "run_id", "try_number2") + key = TaskInstanceKey(dag_id="dag_id", task_id="task_id", run_id="run_id", try_number=2) executor.running = {key} executor._change_state(key, State.SUCCESS, "pod_name", "test-namespace") assert executor.event_buffer[key][0] == State.SUCCESS @@ -859,7 +864,7 @@ def test_change_state_failed_pod_deletion( executor.start() try: - key = ("dag_id", "task_id", "run_id", "try_number2") + key = TaskInstanceKey(dag_id="dag_id", task_id="task_id", run_id="run_id", try_number=2) executor.running = {key} executor._change_state(key, State.FAILED, "pod_name", "test-namespace") assert executor.event_buffer[key][0] == State.FAILED @@ -1483,6 +1488,7 @@ def assert_watcher_queue_called_once_with_state(self, state): state, self.core_annotations, self.pod.metadata.resource_version, + mock.ANY, # failure_details can be any value including None ) ) @@ -1719,6 +1725,7 @@ def test_process_status_pod_adopted(self, ti_state): ADOPTED, self.core_annotations, self.pod.metadata.resource_version, + None, # failure_details is None for ADOPTED state ) )