Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,11 @@ class PodMutationHookException(AirflowException):

class PodReconciliationError(AirflowException):
"""Raised when an error is encountered while trying to merge pod configs."""


class KubernetesApiError(AirflowException):
"""Raised when an error is encountered while trying access Kubernetes API."""


class KubernetesApiPermissionError(AirflowException):
"""Raised when an error is encountered while trying access Kubernetes API."""
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

from airflow.exceptions import AirflowException, AirflowNotFoundException
from airflow.models import Connection
from airflow.providers.cncf.kubernetes.exceptions import KubernetesApiError, KubernetesApiPermissionError
from airflow.providers.cncf.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import should_retry_creation
from airflow.providers.cncf.kubernetes.utils.container import (
Expand All @@ -48,7 +49,7 @@

if TYPE_CHECKING:
from kubernetes.client import V1JobList
from kubernetes.client.models import V1Job, V1Pod
from kubernetes.client.models import CoreV1EventList, V1Job, V1Pod

LOADING_KUBE_CONFIG_FILE_RESOURCE = "Loading Kubernetes configuration file kube_config from {}..."

Expand Down Expand Up @@ -878,12 +879,17 @@ async def get_pod(self, name: str, namespace: str) -> V1Pod:
:param namespace: Name of the pod's namespace.
"""
async with self.get_conn() as connection:
v1_api = async_client.CoreV1Api(connection)
pod: V1Pod = await v1_api.read_namespaced_pod(
name=name,
namespace=namespace,
)
return pod
try:
v1_api = async_client.CoreV1Api(connection)
pod: V1Pod = await v1_api.read_namespaced_pod(
name=name,
namespace=namespace,
)
return pod
except HTTPError as e:
if hasattr(e, "status") and e.status == 403:
raise KubernetesApiPermissionError("Permission denied (403) from Kubernetes API.") from e
raise KubernetesApiError from e

async def delete_pod(self, name: str, namespace: str):
"""
Expand Down Expand Up @@ -932,6 +938,21 @@ async def read_logs(self, name: str, namespace: str):
self.log.exception("There was an error reading the kubernetes API.")
raise

async def get_pod_events(self, name: str, namespace: str) -> CoreV1EventList:
"""Get pod's events."""
async with self.get_conn() as connection:
try:
v1_api = async_client.CoreV1Api(connection)
events: CoreV1EventList = await v1_api.list_namespaced_event(
field_selector=f"involvedObject.name={name}",
namespace=namespace,
)
return events
except HTTPError as e:
if hasattr(e, "status") and e.status == 403:
raise KubernetesApiPermissionError("Permission denied (403) from Kubernetes API.") from e
raise KubernetesApiError from e

async def get_job_status(self, name: str, namespace: str) -> V1Job:
"""
Get job's status object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,7 @@ def invoke_defer_method(self, last_log_time: DateTime | None = None) -> None:
get_logs=self.get_logs,
startup_timeout=self.startup_timeout_seconds,
startup_check_interval=self.startup_check_interval_seconds,
schedule_timeout=self.schedule_timeout_seconds,
base_container_name=self.base_container_name,
on_finish_action=self.on_finish_action.value,
last_log_time=last_log_time,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@

import tenacity

from airflow.providers.cncf.kubernetes.exceptions import KubernetesApiPermissionError
from airflow.providers.cncf.kubernetes.hooks.kubernetes import AsyncKubernetesHook
from airflow.providers.cncf.kubernetes.utils.pod_manager import (
AsyncPodManager,
OnFinishAction,
PodLaunchTimeoutException,
PodPhase,
Expand Down Expand Up @@ -69,6 +71,7 @@ class KubernetesPodTrigger(BaseTrigger):
:param get_logs: get the stdout of the container as logs of the tasks.
:param startup_timeout: timeout in seconds to start up the pod.
:param startup_check_interval: interval in seconds to check if the pod has already started.
:param schedule_timeout: timeout in seconds to schedule pod in cluster.
:param on_finish_action: What to do when the pod reaches its final state, or the execution is interrupted.
If "delete_pod", the pod will be deleted regardless its state; if "delete_succeeded_pod",
only succeeded pod will be deleted. You can set to "keep_pod" to keep the pod.
Expand All @@ -91,7 +94,8 @@ def __init__(
in_cluster: bool | None = None,
get_logs: bool = True,
startup_timeout: int = 120,
startup_check_interval: int = 5,
startup_check_interval: float = 5,
schedule_timeout: int = 120,
on_finish_action: str = "delete_pod",
last_log_time: DateTime | None = None,
logging_interval: int | None = None,
Expand All @@ -110,11 +114,11 @@ def __init__(
self.get_logs = get_logs
self.startup_timeout = startup_timeout
self.startup_check_interval = startup_check_interval
self.schedule_timeout = schedule_timeout
self.last_log_time = last_log_time
self.logging_interval = logging_interval
self.on_finish_action = OnFinishAction(on_finish_action)
self.trigger_kwargs = trigger_kwargs or {}

self._since_time = None

def serialize(self) -> tuple[str, dict[str, Any]]:
Expand All @@ -133,6 +137,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
"get_logs": self.get_logs,
"startup_timeout": self.startup_timeout,
"startup_check_interval": self.startup_check_interval,
"schedule_timeout": self.schedule_timeout,
"trigger_start_time": self.trigger_start_time,
"on_finish_action": self.on_finish_action.value,
"last_log_time": self.last_log_time,
Expand Down Expand Up @@ -182,6 +187,22 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
}
)
return
except KubernetesApiPermissionError as e:
message = (
"Kubernetes API permission error: The triggerer may not have sufficient permissions to monitor or delete pods. "
"Please ensure the triggerer's service account is included in the 'pod-launcher-role' as defined in the latest Airflow Helm chart. "
f"Original error: {e}"
)
yield TriggerEvent(
{
"name": self.pod_name,
"namespace": self.pod_namespace,
"status": "error",
"message": message,
**self.trigger_kwargs,
}
)
return
except Exception as e:
yield TriggerEvent(
{
Expand Down Expand Up @@ -209,17 +230,16 @@ def _format_exception_description(self, exc: Exception) -> Any:

async def _wait_for_pod_start(self) -> ContainerState:
"""Loops until pod phase leaves ``PENDING`` If timeout is reached, throws error."""
while True:
pod = await self._get_pod()
if not pod.status.phase == "Pending":
return self.define_container_state(pod)

delta = datetime.datetime.now(tz=datetime.timezone.utc) - self.trigger_start_time
if self.startup_timeout < delta.total_seconds():
raise PodLaunchTimeoutException("Pod did not leave 'Pending' phase within specified timeout")

self.log.info("Still waiting for pod to start. The pod state is %s", pod.status.phase)
await asyncio.sleep(self.startup_check_interval)
pod = await self._get_pod()
events_task = self.pod_manager.watch_pod_events(pod, self.startup_check_interval)
pod_start_task = self.pod_manager.await_pod_start(
pod=pod,
schedule_timeout=self.schedule_timeout,
startup_timeout=self.startup_timeout,
check_interval=self.startup_check_interval,
)
await asyncio.gather(pod_start_task, events_task)
return self.define_container_state(await self._get_pod())

async def _wait_for_container_completion(self) -> TriggerEvent:
"""
Expand Down Expand Up @@ -287,6 +307,10 @@ def hook(self) -> AsyncKubernetesHook:
cluster_context=self.cluster_context,
)

@cached_property
def pod_manager(self) -> AsyncPodManager:
return AsyncPodManager(async_hook=self.hook)

def define_container_state(self, pod: V1Pod) -> ContainerState:
pod_containers = pod.status.container_statuses

Expand Down
Loading