diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index 4955db763329b..e926a1f49dde3 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -27,6 +27,7 @@ import warnings from collections.abc import Container from contextlib import AbstractContextManager +from enum import Enum from functools import cached_property from typing import TYPE_CHECKING, Any, Callable, Iterable, Sequence @@ -95,6 +96,13 @@ KUBE_CONFIG_ENV_VAR = "KUBECONFIG" +class PodEventType(Enum): + """Type of Events emitted by kubernetes pod.""" + + WARNING = "Warning" + NORMAL = "Normal" + + class PodReattachFailure(AirflowException): """When we expect to be able to find a pod but cannot.""" @@ -548,8 +556,7 @@ def await_pod_start(self, pod: k8s.V1Pod): ) except PodLaunchFailedException: if self.log_events_on_failure: - for event in self.pod_manager.read_pod_events(pod).items: - self.log.error("Pod Event: %s - %s", event.reason, event.message) + self._read_pod_events(pod, reraise=False) raise def extract_xcom(self, pod: k8s.V1Pod): @@ -855,7 +862,10 @@ def _read_pod_events(self, pod, *, reraise=True): """Will fetch and emit events from pod.""" with _optionally_suppress(reraise=reraise): for event in self.pod_manager.read_pod_events(pod).items: - self.log.error("Pod Event: %s - %s", event.reason, event.message) + if event.type == PodEventType.NORMAL.value: + self.log.info("Pod Event: %s - %s", event.reason, event.message) + else: + self.log.error("Pod Event: %s - %s", event.reason, event.message) def is_istio_enabled(self, pod: V1Pod) -> bool: """Check if istio is enabled for the namespace of the pod by inspecting the namespace labels.""" diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py b/tests/providers/cncf/kubernetes/operators/test_pod.py index 5d914c8d60941..3b1d61c53670d 100644 --- a/tests/providers/cncf/kubernetes/operators/test_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_pod.py @@ -24,14 +24,18 @@ import pendulum import pytest -from kubernetes.client import ApiClient, V1PodSecurityContext, V1PodStatus, models as k8s +from kubernetes.client import ApiClient, V1Pod, V1PodSecurityContext, V1PodStatus, models as k8s from urllib3 import HTTPResponse from airflow.exceptions import AirflowException, AirflowSkipException, TaskDeferred from airflow.models import DAG, DagModel, DagRun, TaskInstance from airflow.models.xcom import XCom from airflow.providers.cncf.kubernetes import pod_generator -from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator, _optionally_suppress +from airflow.providers.cncf.kubernetes.operators.pod import ( + KubernetesPodOperator, + PodEventType, + _optionally_suppress, +) from airflow.providers.cncf.kubernetes.secret import Secret from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger from airflow.providers.cncf.kubernetes.utils.pod_manager import ( @@ -2154,3 +2158,39 @@ def test_async_skip_kpo_wait_termination_with_timeout_event(mock_manager, mocked # assert that the cleanup is called post_complete_action.assert_called_once() + + +@patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.pod_manager") +@patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.log") +def test_read_pod_events(mock_log, mock_pod_manager): + # Create a mock pod + pod = V1Pod() + + # Create mock events + mock_event_normal = MagicMock() + mock_event_normal.type = PodEventType.NORMAL.value + mock_event_normal.reason = "test-reason-normal" + mock_event_normal.message = "test-message-normal" + + mock_event_error = MagicMock() + mock_event_error.type = PodEventType.WARNING.value + mock_event_error.reason = "test-reason-error" + mock_event_error.message = "test-message-error" + + mock_pod_manager.read_pod_events.return_value.items = [mock_event_normal, mock_event_error] + + operator = KubernetesPodOperator(task_id="test-task") + operator._read_pod_events(pod) + + # Assert that event with type `Normal` is logged as info. + mock_log.info.assert_called_once_with( + "Pod Event: %s - %s", + mock_event_normal.reason, + mock_event_normal.message, + ) + # Assert that event with type `Warning` is logged as error. + mock_log.error.assert_called_once_with( + "Pod Event: %s - %s", + mock_event_error.reason, + mock_event_error.message, + )