Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import warnings
from collections.abc import Container
from contextlib import AbstractContextManager
from enum import Enum
from functools import cached_property
from typing import TYPE_CHECKING, Any, Callable, Iterable, Sequence

Expand Down Expand Up @@ -95,6 +96,13 @@
KUBE_CONFIG_ENV_VAR = "KUBECONFIG"


class PodEventType(Enum):
"""Type of Events emitted by kubernetes pod."""

WARNING = "Warning"
NORMAL = "Normal"


class PodReattachFailure(AirflowException):
"""When we expect to be able to find a pod but cannot."""

Expand Down Expand Up @@ -548,8 +556,7 @@ def await_pod_start(self, pod: k8s.V1Pod):
)
except PodLaunchFailedException:
if self.log_events_on_failure:
for event in self.pod_manager.read_pod_events(pod).items:
self.log.error("Pod Event: %s - %s", event.reason, event.message)
self._read_pod_events(pod, reraise=False)
raise

def extract_xcom(self, pod: k8s.V1Pod):
Expand Down Expand Up @@ -855,7 +862,10 @@ def _read_pod_events(self, pod, *, reraise=True):
"""Will fetch and emit events from pod."""
with _optionally_suppress(reraise=reraise):
for event in self.pod_manager.read_pod_events(pod).items:
self.log.error("Pod Event: %s - %s", event.reason, event.message)
if event.type == PodEventType.NORMAL.value:
self.log.info("Pod Event: %s - %s", event.reason, event.message)
else:
self.log.error("Pod Event: %s - %s", event.reason, event.message)

def is_istio_enabled(self, pod: V1Pod) -> bool:
"""Check if istio is enabled for the namespace of the pod by inspecting the namespace labels."""
Expand Down
44 changes: 42 additions & 2 deletions tests/providers/cncf/kubernetes/operators/test_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@

import pendulum
import pytest
from kubernetes.client import ApiClient, V1PodSecurityContext, V1PodStatus, models as k8s
from kubernetes.client import ApiClient, V1Pod, V1PodSecurityContext, V1PodStatus, models as k8s
from urllib3 import HTTPResponse

from airflow.exceptions import AirflowException, AirflowSkipException, TaskDeferred
from airflow.models import DAG, DagModel, DagRun, TaskInstance
from airflow.models.xcom import XCom
from airflow.providers.cncf.kubernetes import pod_generator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator, _optionally_suppress
from airflow.providers.cncf.kubernetes.operators.pod import (
KubernetesPodOperator,
PodEventType,
_optionally_suppress,
)
from airflow.providers.cncf.kubernetes.secret import Secret
from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger
from airflow.providers.cncf.kubernetes.utils.pod_manager import (
Expand Down Expand Up @@ -2154,3 +2158,39 @@ def test_async_skip_kpo_wait_termination_with_timeout_event(mock_manager, mocked

# assert that the cleanup is called
post_complete_action.assert_called_once()


@patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.pod_manager")
@patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.log")
def test_read_pod_events(mock_log, mock_pod_manager):
# Create a mock pod
pod = V1Pod()

# Create mock events
mock_event_normal = MagicMock()
mock_event_normal.type = PodEventType.NORMAL.value
mock_event_normal.reason = "test-reason-normal"
mock_event_normal.message = "test-message-normal"

mock_event_error = MagicMock()
mock_event_error.type = PodEventType.WARNING.value
mock_event_error.reason = "test-reason-error"
mock_event_error.message = "test-message-error"

mock_pod_manager.read_pod_events.return_value.items = [mock_event_normal, mock_event_error]

operator = KubernetesPodOperator(task_id="test-task")
operator._read_pod_events(pod)

# Assert that event with type `Normal` is logged as info.
mock_log.info.assert_called_once_with(
"Pod Event: %s - %s",
mock_event_normal.reason,
mock_event_normal.message,
)
# Assert that event with type `Warning` is logged as error.
mock_log.error.assert_called_once_with(
"Pod Event: %s - %s",
mock_event_error.reason,
mock_event_error.message,
)