Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chart/templates/rbac/pod-launcher-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,5 @@ rules:
- "events"
verbs:
- "list"
- "watch"
{{- end }}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import contextlib
import json
import tempfile
from collections.abc import Generator
from functools import cached_property
from time import sleep
from typing import TYPE_CHECKING, Any, Protocol
Expand All @@ -31,7 +30,7 @@
from kubernetes import client, config, utils, watch
from kubernetes.client.models import V1Deployment
from kubernetes.config import ConfigException
from kubernetes_asyncio import client as async_client, config as async_config
from kubernetes_asyncio import client as async_client, config as async_config, watch as async_watch
from urllib3.exceptions import HTTPError

from airflow.exceptions import AirflowException, AirflowNotFoundException
Expand All @@ -47,8 +46,10 @@
from airflow.utils import yaml

if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Generator

from kubernetes.client import V1JobList
from kubernetes.client.models import CoreV1EventList, V1Job, V1Pod
from kubernetes.client.models import CoreV1Event, CoreV1EventList, V1Job, V1Pod

LOADING_KUBE_CONFIG_FILE_RESOURCE = "Loading Kubernetes configuration file kube_config from {}..."

Expand Down Expand Up @@ -783,6 +784,7 @@ def __init__(self, config_dict: dict | None = None, *args, **kwargs):

self.config_dict = config_dict
self._extras: dict | None = None
self._event_polling_fallback = False

async def _load_config(self):
"""Return Kubernetes API session for use with requests."""
Expand Down Expand Up @@ -954,21 +956,105 @@ async def read_logs(
raise KubernetesApiError from e

@generic_api_retry
async def get_pod_events(self, name: str, namespace: str) -> CoreV1EventList:
"""Get pod's events."""
async def get_pod_events(
self, name: str, namespace: str, resource_version: str | None = None
) -> CoreV1EventList:
"""
Get pod events.

:param name: Pod name to get events for
:param namespace: Kubernetes namespace
:param resource_version: Only return events not older than this resource version
"""
async with self.get_conn() as connection:
try:
v1_api = async_client.CoreV1Api(connection)
events: CoreV1EventList = await v1_api.list_namespaced_event(
field_selector=f"involvedObject.name={name}",
namespace=namespace,
resource_version=resource_version,
resource_version_match="NotOlderThan" if resource_version else None,
)
return events
except HTTPError as e:
if hasattr(e, "status") and e.status == 403:
raise KubernetesApiPermissionError("Permission denied (403) from Kubernetes API.") from e
raise KubernetesApiError from e

@generic_api_retry
async def watch_pod_events(
self,
name: str,
namespace: str,
resource_version: str | None = None,
timeout_seconds: int = 30,
) -> AsyncGenerator[CoreV1Event]:
"""
Watch pod events using Kubernetes Watch API.

:param name: Pod name to watch events for
:param namespace: Kubernetes namespace
:param resource_version: Only return events not older than this resource version
:param timeout_seconds: Timeout in seconds for the watch stream
"""
if self._event_polling_fallback:
async for event_polled in self.watch_pod_events_polling_fallback(
name, namespace, resource_version, timeout_seconds
):
yield event_polled

try:
w = async_watch.Watch()
async with self.get_conn() as connection:
v1_api = async_client.CoreV1Api(connection)

async for event_watched in w.stream(
v1_api.list_namespaced_event,
namespace=namespace,
field_selector=f"involvedObject.name={name}",
resource_version=resource_version,
timeout_seconds=timeout_seconds,
):
event: CoreV1Event = event_watched.get("object")
yield event

except async_client.exceptions.ApiException as e:
if hasattr(e, "status") and e.status == 403:
self.log.warning(
"Triggerer does not have Kubernetes API permission to 'watch' events: %s Falling back to polling.",
str(e),
)
self._event_polling_fallback = True
async for event_polled in self.watch_pod_events_polling_fallback(
name, namespace, resource_version, timeout_seconds
):
yield event_polled

finally:
w.stop()

async def watch_pod_events_polling_fallback(
self,
name: str,
namespace: str,
resource_version: str | None = None,
interval: int = 30,
) -> AsyncGenerator[CoreV1Event]:
"""
Fallback method to poll pod event at regular intervals.

This is required when the Airflow triggerer does not have permission to watch events.

:param name: Pod name to watch events for
:param namespace: Kubernetes namespace
:param resource_version: Only return events not older than this resource version
:param interval: Polling interval in seconds
"""
events: CoreV1EventList = await self.get_pod_events(name, namespace, resource_version)
for event in events.items:
yield event
await asyncio.sleep(interval)

@generic_api_retry
async def get_job_status(self, name: str, namespace: str) -> V1Job:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,14 +630,26 @@ def await_pod_start(self, pod: k8s.V1Pod) -> None:
try:

async def _await_pod_start():
events_task = self.pod_manager.watch_pod_events(pod, self.startup_check_interval_seconds)
pod_start_task = self.pod_manager.await_pod_start(
pod=pod,
schedule_timeout=self.schedule_timeout_seconds,
startup_timeout=self.startup_timeout_seconds,
check_interval=self.startup_check_interval_seconds,
# Start event stream in background
events_task = asyncio.create_task(
self.pod_manager.watch_pod_events(pod, self.startup_check_interval_seconds)
)
await asyncio.gather(pod_start_task, events_task)

# Await pod start completion
try:
await self.pod_manager.await_pod_start(
pod=pod,
schedule_timeout=self.schedule_timeout_seconds,
startup_timeout=self.startup_timeout_seconds,
check_interval=self.startup_check_interval_seconds,
)
finally:
# Stop watching events
events_task.cancel()
try:
await events_task
except asyncio.CancelledError:
pass

asyncio.run(_await_pod_start())
except PodLaunchFailedException:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,25 @@ def _format_exception_description(self, exc: Exception) -> Any:
async def _wait_for_pod_start(self) -> ContainerState:
"""Loops until pod phase leaves ``PENDING`` If timeout is reached, throws error."""
pod = await self._get_pod()
events_task = self.pod_manager.watch_pod_events(pod, self.startup_check_interval)
pod_start_task = self.pod_manager.await_pod_start(
pod=pod,
schedule_timeout=self.schedule_timeout,
startup_timeout=self.startup_timeout,
check_interval=self.startup_check_interval,
)
await asyncio.gather(pod_start_task, events_task)
# Start event stream in background
events_task = asyncio.create_task(self.pod_manager.watch_pod_events(pod, self.startup_check_interval))

# Await pod start completion
try:
await self.pod_manager.await_pod_start(
pod=pod,
schedule_timeout=self.schedule_timeout,
startup_timeout=self.startup_timeout,
check_interval=self.startup_check_interval,
)
finally:
# Stop watching events
events_task.cancel()
try:
await events_task
except asyncio.CancelledError:
pass

return self.define_container_state(await self._get_pod())

async def _wait_for_container_completion(self) -> TriggerEvent:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
from airflow.utils.timezone import utcnow

if TYPE_CHECKING:
from kubernetes.client.models.core_v1_event import CoreV1Event
from kubernetes.client.models.core_v1_event_list import CoreV1EventList
from kubernetes.client.models.v1_container_state import V1ContainerState
from kubernetes.client.models.v1_container_state_waiting import V1ContainerStateWaiting
Expand Down Expand Up @@ -94,34 +95,21 @@ def check_exception_is_kubernetes_api_unauthorized(exc: BaseException):
return isinstance(exc, ApiException) and exc.status and str(exc.status) == "401"


async def watch_pod_events(
pod_manager: PodManager | AsyncPodManager,
pod: V1Pod,
check_interval: float = 1,
def log_pod_event(
pod_manager: PodManager | AsyncPodManager, event: CoreV1Event, seen_events: set[str]
) -> None:
"""
Read pod events and write them to the log.
Log a pod event if not already seen.

This function supports both asynchronous and synchronous pod managers.

:param pod_manager: The pod manager instance (PodManager or AsyncPodManager).
:param pod: The pod object to monitor.
:param check_interval: Interval (in seconds) between checks.
:param pod_manager: The pod manager instance for logging
:param event: Kubernetes event
:param seen_events: Set of event UIDs already logged to avoid duplicates
"""
num_events = 0
is_async = isinstance(pod_manager, AsyncPodManager)
while not pod_manager.stop_watching_events:
if is_async:
events = await pod_manager.read_pod_events(pod)
else:
events = pod_manager.read_pod_events(pod)
for new_event in events.items[num_events:]:
involved_object: V1ObjectReference = new_event.involved_object
pod_manager.log.info(
"The Pod has an Event: %s from %s", new_event.message, involved_object.field_path
)
num_events = len(events.items)
await asyncio.sleep(check_interval)
event_uid = event.metadata.uid
if event_uid not in seen_events:
seen_events.add(event_uid)
involved_object: V1ObjectReference = event.involved_object
pod_manager.log.info("The Pod has an Event: %s from %s", event.message, involved_object.field_path)


async def await_pod_start(
Expand Down Expand Up @@ -170,12 +158,14 @@ async def await_pod_start(
pod_manager.log.info("Waiting %ss to get the POD running...", startup_timeout)

if time.time() - start_check_time >= startup_timeout:
pod_manager.stop_watching_events = True
pod_manager.log.info("::endgroup::")
raise PodLaunchTimeoutException(
f"Pod took too long to start. More than {startup_timeout}s. Check the pod events in kubernetes."
)
else:
if time.time() - start_check_time >= schedule_timeout:
pod_manager.stop_watching_events = True
pod_manager.log.info("::endgroup::")
raise PodLaunchTimeoutException(
f"Pod took too long to be scheduled on the cluster, giving up. More than {schedule_timeout}s. Check the pod events in kubernetes."
Expand All @@ -188,6 +178,7 @@ async def await_pod_start(
container_waiting: V1ContainerStateWaiting | None = container_state.waiting
if container_waiting:
if container_waiting.reason in ["ErrImagePull", "InvalidImageName"]:
pod_manager.stop_watching_events = True
pod_manager.log.info("::endgroup::")
raise PodLaunchFailedException(
f"Pod docker image cannot be pulled, unable to start: {container_waiting.reason}"
Expand Down Expand Up @@ -354,9 +345,16 @@ def create_pod(self, pod: V1Pod) -> V1Pod:
"""Launch the pod asynchronously."""
return self.run_pod_async(pod)

async def watch_pod_events(self, pod: V1Pod, check_interval: int = 1) -> None:
"""Read pod events and writes into log."""
await watch_pod_events(pod_manager=self, pod=pod, check_interval=check_interval)
async def watch_pod_events(self, pod: V1Pod, check_interval: float = 10) -> None:
"""Read pod events and write into log."""
resource_version = None
seen_events: set[str] = set()
while not self.stop_watching_events:
events = self.read_pod_events(pod, resource_version)
for event in events.items:
log_pod_event(self, event, seen_events)
resource_version = event.metadata.resource_version
await asyncio.sleep(check_interval)

async def await_pod_start(
self, pod: V1Pod, schedule_timeout: int = 120, startup_timeout: int = 120, check_interval: int = 1
Expand Down Expand Up @@ -772,11 +770,20 @@ def get_container_names(self, pod: V1Pod) -> list[str]:
]

@generic_api_retry
def read_pod_events(self, pod: V1Pod) -> CoreV1EventList:
"""Read events from the POD."""
def read_pod_events(self, pod: V1Pod, resource_version: str | None = None) -> CoreV1EventList:
"""
Read events from the POD with optimization parameters to reduce API load.

:param pod: The pod to get events for
:param resource_version: Only return events newer than this resource version
:param limit: Maximum number of events to return
"""
try:
return self._client.list_namespaced_event(
namespace=pod.metadata.namespace, field_selector=f"involvedObject.name={pod.metadata.name}"
namespace=pod.metadata.namespace,
field_selector=f"involvedObject.name={pod.metadata.name}",
resource_version=resource_version,
resource_version_match="NotOlderThan" if resource_version else None,
)
except HTTPError as e:
raise KubernetesApiException(f"There was an error reading the kubernetes API: {e}")
Expand Down Expand Up @@ -978,16 +985,28 @@ async def read_pod(self, pod: V1Pod) -> V1Pod:
pod.metadata.namespace,
)

async def read_pod_events(self, pod: V1Pod) -> CoreV1EventList:
async def read_pod_events(self, pod: V1Pod, resource_version: str | None = None) -> CoreV1EventList:
"""Get pod's events."""
return await self._hook.get_pod_events(
pod.metadata.name,
pod.metadata.namespace,
resource_version=resource_version,
)

async def watch_pod_events(self, pod: V1Pod, check_interval: float = 1) -> None:
"""Read pod events and writes into log."""
await watch_pod_events(pod_manager=self, pod=pod, check_interval=check_interval)
async def watch_pod_events(self, pod: V1Pod, startup_check_interval: float = 30) -> None:
"""Watch pod events and write to log."""
seen_events: set[str] = set()
resource_version = None
while not self.stop_watching_events:
async for event in self._hook.watch_pod_events(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
resource_version=resource_version,
timeout_seconds=startup_check_interval,
):
if event:
log_pod_event(self, event, seen_events)
resource_version = event.metadata.resource_version

async def await_pod_start(
self, pod: V1Pod, schedule_timeout: int = 120, startup_timeout: int = 120, check_interval: float = 1
Expand Down
Loading
Loading