Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from collections.abc import Generator
from functools import cached_property
from time import sleep
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Protocol

import aiofiles
import requests
Expand All @@ -39,8 +39,7 @@
from airflow.models import Connection
from airflow.providers.cncf.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import should_retry_creation
from airflow.providers.cncf.kubernetes.utils.pod_manager import (
PodOperatorHookProtocol,
from airflow.providers.cncf.kubernetes.utils.container import (
container_is_completed,
container_is_running,
)
Expand Down Expand Up @@ -69,6 +68,36 @@ def _load_body_to_dict(body: str) -> dict:
return body_dict


class PodOperatorHookProtocol(Protocol):
"""
Protocol to define methods relied upon by KubernetesPodOperator.

Subclasses of KubernetesPodOperator, such as GKEStartPodOperator, may use
hooks that don't extend KubernetesHook. We use this protocol to document the
methods used by KPO and ensure that these methods exist on such other hooks.
"""

@property
def core_v1_client(self) -> client.CoreV1Api:
"""Get authenticated client object."""

@property
def is_in_cluster(self) -> bool:
"""Expose whether the hook is configured with ``load_incluster_config`` or not."""

def get_pod(self, name: str, namespace: str) -> V1Pod:
"""Read pod object from kubernetes API."""

def get_namespace(self) -> str | None:
"""Return the namespace that defined in the connection."""

def get_xcom_sidecar_container_image(self) -> str | None:
"""Return the xcom sidecar image that defined in the connection."""

def get_xcom_sidecar_container_resources(self) -> str | None:
"""Return the xcom sidecar resources that defined in the connection."""


class KubernetesHook(BaseHook, PodOperatorHookProtocol):
"""
Creates Kubernetes API connection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,17 @@
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger
from airflow.providers.cncf.kubernetes.utils import xcom_sidecar
from airflow.providers.cncf.kubernetes.utils.container import (
container_is_succeeded,
get_container_termination_message,
)
from airflow.providers.cncf.kubernetes.utils.pod_manager import (
EMPTY_XCOM_RESULT,
OnFinishAction,
PodLaunchFailedException,
PodManager,
PodNotFoundException,
PodOperatorHookProtocol,
PodPhase,
container_is_succeeded,
get_container_termination_message,
)
from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_1_PLUS, XCOM_RETURN_KEY

Expand All @@ -95,6 +96,7 @@
import jinja2
from pendulum import DateTime

from airflow.providers.cncf.kubernetes.hooks.kubernetes import PodOperatorHookProtocol
from airflow.providers.cncf.kubernetes.secret import Secret

try:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Helper functions for inspecting and interacting with containers in a Kubernetes Pod."""

from __future__ import annotations

from contextlib import suppress
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from kubernetes.client.models.v1_container_status import V1ContainerStatus
from kubernetes.client.models.v1_pod import V1Pod


def get_container_status(pod: V1Pod, container_name: str) -> V1ContainerStatus | None:
"""Retrieve container status."""
if pod and pod.status:
container_statuses = []
if pod.status.container_statuses:
container_statuses.extend(pod.status.container_statuses)
if pod.status.init_container_statuses:
container_statuses.extend(pod.status.init_container_statuses)

else:
container_statuses = None

if container_statuses:
# In general the variable container_statuses can store multiple items matching different containers.
# The following generator expression yields all items that have name equal to the container_name.
# The function next() here calls the generator to get only the first value. If there's nothing found
# then None is returned.
return next((x for x in container_statuses if x.name == container_name), None)
return None


def container_is_running(pod: V1Pod, container_name: str) -> bool:
"""
Examine V1Pod ``pod`` to determine whether ``container_name`` is running.

If that container is present and running, returns True. Returns False otherwise.
"""
container_status = get_container_status(pod, container_name)
if not container_status:
return False
return container_status.state.running is not None


def container_is_completed(pod: V1Pod, container_name: str) -> bool:
"""
Examine V1Pod ``pod`` to determine whether ``container_name`` is completed.

If that container is present and completed, returns True. Returns False otherwise.
"""
container_status = get_container_status(pod, container_name)
if not container_status:
return False
return container_status.state.terminated is not None


def container_is_succeeded(pod: V1Pod, container_name: str) -> bool:
"""
Examine V1Pod ``pod`` to determine whether ``container_name`` is completed and succeeded.

If that container is present and completed and succeeded, returns True. Returns False otherwise.
"""
container_status = get_container_status(pod, container_name)
if not container_status or container_status.state.terminated is None:
return False
return container_status.state.terminated.exit_code == 0


def container_is_wait(pod: V1Pod, container_name: str) -> bool:
"""
Examine V1Pod ``pod`` to determine whether ``container_name`` is waiting.

If that container is present and waiting, returns True. Returns False otherwise.
"""
container_status = get_container_status(pod, container_name)
if not container_status:
return False

return container_status.state.waiting is not None


def container_is_terminated(pod: V1Pod, container_name: str) -> bool:
"""
Examine V1Pod ``pod`` to determine whether ``container_name`` is terminated.

If that container is present and terminated, returns True. Returns False otherwise.
"""
container_statuses = pod.status.container_statuses if pod and pod.status else None
if not container_statuses:
return False
container_status = next((x for x in container_statuses if x.name == container_name), None)
if not container_status:
return False
return container_status.state.terminated is not None


def get_container_termination_message(pod: V1Pod, container_name: str):
with suppress(AttributeError, TypeError):
container_statuses = pod.status.container_statuses
container_status = next((x for x in container_statuses if x.name == container_name), None)
return container_status.state.terminated.message if container_status else None
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import math
import time
from collections.abc import Callable, Generator, Iterable
from contextlib import closing, suppress
from contextlib import closing
from dataclasses import dataclass
from datetime import timedelta
from typing import TYPE_CHECKING, Literal, Protocol, cast
from typing import TYPE_CHECKING, Literal, cast

import pendulum
import tenacity
Expand All @@ -40,6 +40,13 @@

from airflow.exceptions import AirflowException
from airflow.providers.cncf.kubernetes.callbacks import ExecutionMode, KubernetesPodOperatorCallback
from airflow.providers.cncf.kubernetes.utils.container import (
container_is_completed,
container_is_running,
container_is_terminated,
container_is_wait,
get_container_status,
)
from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.timezone import utcnow
Expand All @@ -48,7 +55,6 @@
from kubernetes.client.models.core_v1_event_list import CoreV1EventList
from kubernetes.client.models.v1_container_state import V1ContainerState
from kubernetes.client.models.v1_container_state_waiting import V1ContainerStateWaiting
from kubernetes.client.models.v1_container_status import V1ContainerStatus
from kubernetes.client.models.v1_object_reference import V1ObjectReference
from kubernetes.client.models.v1_pod import V1Pod
from kubernetes.client.models.v1_pod_condition import V1PodCondition
Expand Down Expand Up @@ -89,131 +95,6 @@ class PodPhase:
terminal_states = {FAILED, SUCCEEDED}


class PodOperatorHookProtocol(Protocol):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this considered part of the public API?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not look breaking to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question is if it's part of our public API or not.
I don't know if this can be used in some user custom code (operator, executor)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this question is not the one that should be asked - hence I answered the righ one. I don't think we ever specified what is and what is not public API.

But generally it's pretty much the same - Breaking = Public API change in a breaking way. I have not found it to be used elsewhere in our code except cncf.kubernetes - so I assume the intention was to not be used outside of it -> hence not breaking -> hence no public API.

BTW, Semver is not about whether something is a public API or not (because often it is not clearly specified) - but about whether intention of it was to be used as such.

So actually the proper question (if we want to be picky and ask really good question) to ask is "was that intended to be used outside of the provider and changes it in a breaking way?" , And my answer is "I don't think so".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry was distracted all day.

Technically it was not decleared private but I agree to Jarek that we did not define the "public API" on providers. Had the same question if this is breaking as well and did not find any other reference in "our" codebose.
The Protocol is not documented elsewhere and is not needed if you implement operators/Dags or so. So a "normal user" should not see this being removed. I assume every "Advanced user" that potentially builds around the package and extends the operators and needs this would know how to adjust but would be a very nice use case/extension. As GKE was also not using it I think it is safe to remove.

"""
Protocol to define methods relied upon by KubernetesPodOperator.

Subclasses of KubernetesPodOperator, such as GKEStartPodOperator, may use
hooks that don't extend KubernetesHook. We use this protocol to document the
methods used by KPO and ensure that these methods exist on such other hooks.
"""

@property
def core_v1_client(self) -> client.CoreV1Api:
"""Get authenticated client object."""

@property
def is_in_cluster(self) -> bool:
"""Expose whether the hook is configured with ``load_incluster_config`` or not."""

def get_pod(self, name: str, namespace: str) -> V1Pod:
"""Read pod object from kubernetes API."""

def get_namespace(self) -> str | None:
"""Return the namespace that defined in the connection."""

def get_xcom_sidecar_container_image(self) -> str | None:
"""Return the xcom sidecar image that defined in the connection."""

def get_xcom_sidecar_container_resources(self) -> str | None:
"""Return the xcom sidecar resources that defined in the connection."""


def get_container_status(pod: V1Pod, container_name: str) -> V1ContainerStatus | None:
"""Retrieve container status."""
if pod and pod.status:
container_statuses = []
if pod.status.container_statuses:
container_statuses.extend(pod.status.container_statuses)
if pod.status.init_container_statuses:
container_statuses.extend(pod.status.init_container_statuses)

else:
container_statuses = None

if container_statuses:
# In general the variable container_statuses can store multiple items matching different containers.
# The following generator expression yields all items that have name equal to the container_name.
# The function next() here calls the generator to get only the first value. If there's nothing found
# then None is returned.
return next((x for x in container_statuses if x.name == container_name), None)
return None


def container_is_running(pod: V1Pod, container_name: str) -> bool:
"""
Examine V1Pod ``pod`` to determine whether ``container_name`` is running.

If that container is present and running, returns True. Returns False otherwise.
"""
container_status = get_container_status(pod, container_name)
if not container_status:
return False
return container_status.state.running is not None


def container_is_completed(pod: V1Pod, container_name: str) -> bool:
"""
Examine V1Pod ``pod`` to determine whether ``container_name`` is completed.

If that container is present and completed, returns True. Returns False otherwise.
"""
container_status = get_container_status(pod, container_name)
if not container_status:
return False
return container_status.state.terminated is not None


def container_is_succeeded(pod: V1Pod, container_name: str) -> bool:
"""
Examine V1Pod ``pod`` to determine whether ``container_name`` is completed and succeeded.

If that container is present and completed and succeeded, returns True. Returns False otherwise.
"""
if not container_is_completed(pod, container_name):
return False

container_status = get_container_status(pod, container_name)
if not container_status:
return False
return container_status.state.terminated.exit_code == 0


def container_is_wait(pod: V1Pod, container_name: str) -> bool:
"""
Examine V1Pod ``pod`` to determine whether ``container_name`` is waiting.

If that container is present and waiting, returns True. Returns False otherwise.
"""
container_status = get_container_status(pod, container_name)
if not container_status:
return False

return container_status.state.waiting is not None


def container_is_terminated(pod: V1Pod, container_name: str) -> bool:
"""
Examine V1Pod ``pod`` to determine whether ``container_name`` is terminated.

If that container is present and terminated, returns True. Returns False otherwise.
"""
container_statuses = pod.status.container_statuses if pod and pod.status else None
if not container_statuses:
return False
container_status = next((x for x in container_statuses if x.name == container_name), None)
if not container_status:
return False
return container_status.state.terminated is not None


def get_container_termination_message(pod: V1Pod, container_name: str):
with suppress(AttributeError, TypeError):
container_statuses = pod.status.container_statuses
container_status = next((x for x in container_statuses if x.name == container_name), None)
return container_status.state.terminated.message if container_status else None


def check_exception_is_kubernetes_api_unauthorized(exc: BaseException):
return isinstance(exc, ApiException) and exc.status and str(exc.status) == "401"

Expand Down
Loading