Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions kubernetes-tests/tests/kubernetes_tests/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING
from unittest.mock import Mock, patch

import pytest

if TYPE_CHECKING:
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import FailureDetails

from kubernetes_tests.test_base import (
EXECUTOR,
BaseK8STest, # isort:skip (needed to workaround isort bug)
Expand Down Expand Up @@ -102,3 +108,120 @@ def test_integration_run_dag_with_scheduler_failure(self):
)

assert self._num_pods_in_namespace("test-namespace") == 0, "failed to delete pods in other namespace"

@pytest.mark.execution_timeout(300)
@patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.log")
def test_pod_failure_logging_with_container_terminated(self, mock_log):
"""Test that pod failure information is logged when container is terminated."""

from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor
from airflow.utils.state import TaskInstanceState

# Create a mock KubernetesExecutor instance
executor = KubernetesExecutor()
executor.kube_scheduler = Mock()

# Create test failure details
failure_details: FailureDetails = {
"pod_status": "Failed",
"pod_reason": "PodFailed",
"pod_message": "Pod execution failed",
"container_state": "terminated",
"container_reason": "Error",
"container_message": "Container failed with exit code 1",
"exit_code": 1,
"container_type": "main",
"container_name": "test-container",
}

# Create a test task key
task_key = TaskInstanceKey(dag_id="test_dag", task_id="test_task", run_id="test_run", try_number=1)

# Call _change_state with FAILED status and failure details
executor._change_state(
key=task_key,
state=TaskInstanceState.FAILED,
pod_name="test-pod",
namespace="test-namespace",
failure_details=failure_details,
)

# Verify that the warning log was called with expected parameters
mock_log.warning.assert_called_once_with(
"Task %s failed in pod %s/%s. Pod phase: %s, reason: %s, message: %s, "
"container_type: %s, container_name: %s, container_state: %s, container_reason: %s, "
"container_message: %s, exit_code: %s",
"test_dag.test_task.1",
"test-namespace",
"test-pod",
"Failed",
"PodFailed",
"Pod execution failed",
"main",
"test-container",
"terminated",
"Error",
"Container failed with exit code 1",
1,
)

@pytest.mark.execution_timeout(300)
@patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.log")
def test_pod_failure_logging_exception_handling(self, mock_log):
"""Test that failures without details are handled gracefully."""
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor
from airflow.utils.state import TaskInstanceState

# Create a mock KubernetesExecutor instance
executor = KubernetesExecutor()
executor.kube_scheduler = Mock()

# Create a test task key
task_key = TaskInstanceKey(dag_id="test_dag", task_id="test_task", run_id="test_run", try_number=1)

# Call _change_state with FAILED status but no failure details
executor._change_state(
key=task_key,
state=TaskInstanceState.FAILED,
pod_name="test-pod",
namespace="test-namespace",
failure_details=None,
)

# Verify that the warning log was called with the correct parameters
mock_log.warning.assert_called_once_with(
"Task %s failed in pod %s/%s (no details available)",
"test_dag.test_task.1",
"test-namespace",
"test-pod",
)

@pytest.mark.execution_timeout(300)
@patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.log")
def test_pod_failure_logging_non_failed_state(self, mock_log):
"""Test that pod failure logging only occurs for FAILED state."""
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor
from airflow.utils.state import TaskInstanceState

# Create a mock KubernetesExecutor instance
executor = KubernetesExecutor()
executor.kube_client = Mock()
executor.kube_scheduler = Mock()

# Create a test task key
task_key = TaskInstanceKey(dag_id="test_dag", task_id="test_task", run_id="test_run", try_number=1)

# Call _change_state with SUCCESS status
executor._change_state(
key=task_key, state=TaskInstanceState.SUCCESS, pod_name="test-pod", namespace="test-namespace"
)

# Verify that no failure logs were called
mock_log.error.assert_not_called()
mock_log.warning.assert_not_called()

# Verify that kube_client methods were not called
executor.kube_client.read_namespaced_pod.assert_not_called()
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import (
ADOPTED,
POD_EXECUTOR_DONE_KEY,
FailureDetails,
)
from airflow.providers.cncf.kubernetes.kube_config import KubeConfig
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import annotations_to_key
Expand Down Expand Up @@ -330,11 +331,11 @@ def sync(self) -> None:
while True:
results = self.result_queue.get_nowait()
try:
key, state, pod_name, namespace, resource_version = results
key, state, pod_name, namespace, resource_version, failure_details = results
last_resource_version[namespace] = resource_version
self.log.info("Changing state of %s to %s", results, state)
try:
self._change_state(key, state, pod_name, namespace)
self._change_state(key, state, pod_name, namespace, failure_details)
except Exception as e:
self.log.exception(
"Exception: %s when attempting to change state of %s to %s, re-queueing.",
Expand Down Expand Up @@ -412,11 +413,49 @@ def _change_state(
state: TaskInstanceState | str | None,
pod_name: str,
namespace: str,
failure_details: FailureDetails | None = None,
session: Session = NEW_SESSION,
) -> None:
if TYPE_CHECKING:
assert self.kube_scheduler

if state == TaskInstanceState.FAILED:
# Use pre-collected failure details from the watcher to avoid additional API calls
if failure_details:
pod_status = failure_details.get("pod_status")
pod_reason = failure_details.get("pod_reason")
pod_message = failure_details.get("pod_message")
container_state = failure_details.get("container_state")
container_reason = failure_details.get("container_reason")
container_message = failure_details.get("container_message")
exit_code = failure_details.get("exit_code")
container_type = failure_details.get("container_type")
container_name = failure_details.get("container_name")

task_key_str = f"{key.dag_id}.{key.task_id}.{key.try_number}"
self.log.warning(
"Task %s failed in pod %s/%s. Pod phase: %s, reason: %s, message: %s, "
"container_type: %s, container_name: %s, container_state: %s, container_reason: %s, "
"container_message: %s, exit_code: %s",
task_key_str,
namespace,
pod_name,
pod_status,
pod_reason,
pod_message,
container_type,
container_name,
container_state,
container_reason,
container_message,
exit_code,
)
else:
task_key_str = f"{key.dag_id}.{key.task_id}.{key.try_number}"
self.log.warning(
"Task %s failed in pod %s/%s (no details available)", task_key_str, namespace, pod_name
)

if state == ADOPTED:
# When the task pod is adopted by another executor,
# then remove the task from the current executor running queue.
Expand Down Expand Up @@ -696,12 +735,12 @@ def _flush_result_queue(self) -> None:
results = self.result_queue.get_nowait()
self.log.warning("Executor shutting down, flushing results=%s", results)
try:
key, state, pod_name, namespace, resource_version = results
key, state, pod_name, namespace, resource_version, failure_details = results
self.log.info(
"Changing state of %s to %s : resource_version=%d", results, state, resource_version
)
try:
self._change_state(key, state, pod_name, namespace)
self._change_state(key, state, pod_name, namespace, failure_details)
except Exception as e:
self.log.exception(
"Ignoring exception: %s when attempting to change state of %s to %s.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,25 @@
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Literal, TypedDict

ADOPTED = "adopted"


class FailureDetails(TypedDict, total=False):
"""Detailed information about pod/container failure."""

pod_status: str | None
pod_reason: str | None
pod_message: str | None
container_state: str | None
container_reason: str | None
container_message: str | None
exit_code: int | None
container_type: Literal["init", "main"] | None
container_name: str | None


if TYPE_CHECKING:
from collections.abc import Sequence

Expand All @@ -31,11 +47,15 @@
# TaskInstance key, command, configuration, pod_template_file
KubernetesJobType = tuple[TaskInstanceKey, CommandType, Any, str | None]

# key, pod state, pod_name, namespace, resource_version
KubernetesResultsType = tuple[TaskInstanceKey, TaskInstanceState | str | None, str, str, str]
# key, pod state, pod_name, namespace, resource_version, failure_details
KubernetesResultsType = tuple[
TaskInstanceKey, TaskInstanceState | str | None, str, str, str, FailureDetails | None
]

# pod_name, namespace, pod state, annotations, resource_version
KubernetesWatchType = tuple[str, str, TaskInstanceState | str | None, dict[str, str], str]
# pod_name, namespace, pod state, annotations, resource_version, failure_details
KubernetesWatchType = tuple[
str, str, TaskInstanceState | str | None, dict[str, str], str, FailureDetails | None
]

ALL_NAMESPACES = "ALL_NAMESPACES"
POD_EXECUTOR_DONE_KEY = "airflow_executor_done"
Expand Down
Loading
Loading