Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deprecated calls in cncf.kubernetes provider #39381

Merged
merged 1 commit into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import (
annotations_for_logging_task_metadata,
annotations_to_key,
create_pod_id,
create_unique_id,
)
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
from airflow.utils.log.logging_mixin import LoggingMixin
Expand Down Expand Up @@ -413,7 +413,7 @@ def run_next(self, next_job: KubernetesJobType) -> None:
pod = PodGenerator.construct_pod(
namespace=self.namespace,
scheduler_job_id=self.scheduler_job_id,
pod_id=create_pod_id(dag_id, task_id),
pod_id=create_unique_id(dag_id, task_id),
dag_id=dag_id,
task_id=task_id,
kube_image=self.kube_config.kube_image,
Expand Down
40 changes: 11 additions & 29 deletions airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import logging
import secrets
import string
import warnings
from typing import TYPE_CHECKING

import pendulum
from deprecated import deprecated
from slugify import slugify

from airflow.compat.functools import cache
Expand Down Expand Up @@ -59,6 +59,10 @@ def add_unique_suffix(*, name: str, rand_len: int = 8, max_len: int = POD_NAME_M
return name[: max_len - len(suffix)].strip("-.") + suffix


@deprecated(
reason="This function is deprecated. Please use `add_unique_suffix`",
category=AirflowProviderDeprecationWarning,
)
def add_pod_suffix(*, pod_name: str, rand_len: int = 8, max_len: int = POD_NAME_MAX_LENGTH) -> str:
"""Add random string to pod name while staying under max length.

Expand All @@ -67,14 +71,7 @@ def add_pod_suffix(*, pod_name: str, rand_len: int = 8, max_len: int = POD_NAME_
:param max_len: maximum length of the pod name
:meta private:
"""
warnings.warn(
"This function is deprecated. Please use `add_unique_suffix`.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)

suffix = "-" + rand_str(rand_len)
return pod_name[: max_len - len(suffix)].strip("-.") + suffix
return add_unique_suffix(name=pod_name, rand_len=rand_len, max_len=max_len)


def create_unique_id(
Expand Down Expand Up @@ -109,6 +106,10 @@ def create_unique_id(
return base_name


@deprecated(
reason="This function is deprecated. Please use `create_unique_id`.",
category=AirflowProviderDeprecationWarning,
)
def create_pod_id(
dag_id: str | None = None,
task_id: str | None = None,
Expand All @@ -125,26 +126,7 @@ def create_pod_id(
:param unique: whether a random string suffix should be added
:return: A valid identifier for a kubernetes pod name
"""
warnings.warn(
"This function is deprecated. Please use `create_unique_id`.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)

if not (dag_id or task_id):
raise ValueError("Must supply either dag_id or task_id.")
name = ""
if dag_id:
name += dag_id
if task_id:
if name:
name += "-"
name += task_id
base_name = slugify(name, lowercase=True)[:max_length].strip(".-")
if unique:
return add_pod_suffix(pod_name=base_name, rand_len=8, max_len=max_length)
else:
return base_name
return create_unique_id(dag_id=dag_id, task_id=task_id, max_length=max_length, unique=unique)


def annotations_to_key(annotations: dict[str, str]) -> TaskInstanceKey:
Expand Down
20 changes: 8 additions & 12 deletions airflow/providers/cncf/kubernetes/pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
)
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import (
POD_NAME_MAX_LENGTH,
add_pod_suffix,
add_unique_suffix,
rand_str,
)
Expand Down Expand Up @@ -156,12 +155,15 @@ def __init__(
# Attach sidecar
self.extract_xcom = extract_xcom

@deprecated(reason="This function is deprecated.", category=AirflowProviderDeprecationWarning)
@deprecated(
reason="This method is deprecated and will be removed in the future releases",
category=AirflowProviderDeprecationWarning,
)
def gen_pod(self) -> k8s.V1Pod:
"""Generate pod."""
result = self.ud_pod

result.metadata.name = add_pod_suffix(pod_name=result.metadata.name)
result.metadata.name = add_unique_suffix(name=result.metadata.name)

if self.extract_xcom:
result = self.add_xcom_sidecar(result)
Expand Down Expand Up @@ -210,8 +212,8 @@ def from_obj(obj) -> dict | k8s.V1Pod | None:
return k8s_object
elif isinstance(k8s_legacy_object, dict):
warnings.warn(
"Using a dictionary for the executor_config is deprecated and will soon be removed."
'please use a `kubernetes.client.models.V1Pod` class with a "pod_override" key'
"Using a dictionary for the executor_config is deprecated and will soon be removed. "
'Please use a `kubernetes.client.models.V1Pod` class with a "pod_override" key'
" instead. ",
category=AirflowProviderDeprecationWarning,
stacklevel=2,
Expand Down Expand Up @@ -575,7 +577,7 @@ def deserialize_model_dict(pod_dict: dict | None) -> k8s.V1Pod:

@staticmethod
@deprecated(
reason="This function is deprecated. Use `add_pod_suffix` in `kubernetes_helper_functions`.",
reason="This method is deprecated. Use `add_pod_suffix` in `kubernetes_helper_functions`.",
category=AirflowProviderDeprecationWarning,
)
def make_unique_pod_id(pod_id: str) -> str | None:
Expand All @@ -595,12 +597,6 @@ def make_unique_pod_id(pod_id: str) -> str | None:
:param pod_id: requested pod name
:return: ``str`` valid Pod name of appropriate length
"""
warnings.warn(
"This function is deprecated. Use `add_pod_suffix` in `kubernetes_helper_functions`.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)

if not pod_id:
return None

Expand Down
6 changes: 2 additions & 4 deletions airflow/providers/cncf/kubernetes/template_rendering.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@

from airflow.exceptions import AirflowException
from airflow.providers.cncf.kubernetes.kube_config import KubeConfig
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import (
create_pod_id,
)
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import create_unique_id
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
from airflow.utils.session import NEW_SESSION, provide_session

Expand All @@ -43,7 +41,7 @@ def render_k8s_pod_yaml(task_instance: TaskInstance) -> dict | None:
task_id=task_instance.task_id,
map_index=task_instance.map_index,
date=None,
pod_id=create_pod_id(task_instance.dag_id, task_instance.task_id),
pod_id=create_unique_id(task_instance.dag_id, task_instance.task_id),
try_number=task_instance.try_number,
kube_image=kube_config.kube_image,
args=task_instance.command_as_list(),
Expand Down
29 changes: 1 addition & 28 deletions tests/deprecations_ignore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@


# CLI
# https://github.com/apache/airflow/issues/39199
- tests/cli/commands/test_kubernetes_command.py::TestGenerateDagYamlCommand::test_generate_dag_yaml


Expand Down Expand Up @@ -456,34 +457,6 @@
- tests/providers/atlassian/jira/operators/test_jira.py::TestJiraOperator::test_project_issue_count
- tests/providers/atlassian/jira/operators/test_jira.py::TestJiraOperator::test_update_issue
- tests/providers/atlassian/jira/sensors/test_jira.py::TestJiraSensor::test_issue_label_set
- tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py::TestAirflowKubernetesScheduler::test_create_pod_id
- tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py::TestKubernetesExecutor::test_pod_template_file_override_in_executor_config
- tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py::TestKubernetesExecutor::test_run_next_exception_requeue
- tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py::TestKubernetesExecutor::test_run_next_pmh_error
- tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py::TestKubernetesExecutor::test_run_next_pod_reconciliation_error
- tests/providers/cncf/kubernetes/operators/test_pod.py::TestKubernetesPodOperator::test_execute_async_callbacks
- tests/providers/cncf/kubernetes/operators/test_pod.py::TestKubernetesPodOperator::test_mark_checked_if_not_deleted
- tests/providers/cncf/kubernetes/operators/test_pod.py::TestKubernetesPodOperator::test_pod_delete_after_await_container_error
- tests/providers/cncf/kubernetes/operators/test_pod.py::TestKubernetesPodOperatorAsync::test_async_create_pod_should_throw_exception
- tests/providers/cncf/kubernetes/operators/test_pod.py::TestKubernetesPodOperatorAsync::test_async_create_pod_with_skip_on_exit_code_should_skip
- tests/providers/cncf/kubernetes/operators/test_pod.py::TestKubernetesPodOperatorAsync::test_async_create_pod_xcom_push_should_execute_successfully
- tests/providers/cncf/kubernetes/operators/test_pod.py::TestKubernetesPodOperatorAsync::test_async_get_logs_should_execute_successfully
- tests/providers/cncf/kubernetes/operators/test_pod.py::TestKubernetesPodOperatorAsync::test_async_write_logs_should_execute_successfully
- tests/providers/cncf/kubernetes/operators/test_pod.py::test_async_kpo_wait_termination_before_cleanup_on_failure
- tests/providers/cncf/kubernetes/operators/test_pod.py::test_async_kpo_wait_termination_before_cleanup_on_success
- tests/providers/cncf/kubernetes/operators/test_pod.py::test_async_skip_kpo_wait_termination_with_timeout_event
- tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py::TestSparkKubernetesOperator::test_create_application_from_yaml_json
- tests/providers/cncf/kubernetes/test_kubernetes_helper_functions.py::TestCreatePodId::test_create_pod_id
- tests/providers/cncf/kubernetes/test_kubernetes_helper_functions.py::TestCreatePodId::test_create_pod_id_dag_and_task
- tests/providers/cncf/kubernetes/test_kubernetes_helper_functions.py::TestCreatePodId::test_create_pod_id_dag_only
- tests/providers/cncf/kubernetes/test_kubernetes_helper_functions.py::TestCreatePodId::test_create_pod_id_dag_too_long_non_unique
- tests/providers/cncf/kubernetes/test_kubernetes_helper_functions.py::TestCreatePodId::test_create_pod_id_dag_too_long_with_suffix
- tests/providers/cncf/kubernetes/test_kubernetes_helper_functions.py::TestCreatePodId::test_create_pod_id_task_only
- tests/providers/cncf/kubernetes/test_pod_generator.py::TestPodGenerator::test_from_obj
- tests/providers/cncf/kubernetes/test_pod_generator.py::TestPodGenerator::test_gen_pod_extract_xcom
- tests/providers/cncf/kubernetes/test_pod_generator.py::TestPodGenerator::test_pod_name_confirm_to_max_length
- tests/providers/cncf/kubernetes/test_pod_generator.py::TestPodGenerator::test_pod_name_is_valid
- tests/providers/cncf/kubernetes/test_template_rendering.py::test_render_k8s_pod_yaml
- tests/providers/common/sql/hooks/test_dbapi.py::TestDbApiHook::test_insert_rows_executemany
- tests/providers/common/sql/hooks/test_dbapi.py::TestDbApiHook::test_insert_rows_replace_executemany_hana_dialect
- tests/providers/common/sql/hooks/test_dbapi.py::TestDbApiHook::test_instance_check_works_for_legacy_db_api_hook
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,40 +28,36 @@
from kubernetes.client.rest import ApiException
from urllib3 import HTTPResponse

from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.providers.cncf.kubernetes import pod_generator
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import (
KubernetesExecutor,
PodReconciliationError,
)
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import (
ADOPTED,
POD_EXECUTOR_DONE_KEY,
)
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils import (
AirflowKubernetesScheduler,
KubernetesJobWatcher,
ResourceVersion,
get_base_pod_from_template,
)
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import (
annotations_for_logging_task_metadata,
annotations_to_key,
create_unique_id,
get_logs_task_metadata,
)
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
from airflow.utils import timezone
from airflow.utils.state import State, TaskInstanceState
from tests.test_utils.config import conf_vars

try:
from airflow.providers.cncf.kubernetes import pod_generator
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import (
KubernetesExecutor,
PodReconciliationError,
)
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import (
ADOPTED,
POD_EXECUTOR_DONE_KEY,
)
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils import (
AirflowKubernetesScheduler,
KubernetesJobWatcher,
ResourceVersion,
create_pod_id,
get_base_pod_from_template,
)
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import (
annotations_for_logging_task_metadata,
annotations_to_key,
get_logs_task_metadata,
)
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
except ImportError:
AirflowKubernetesScheduler = None # type: ignore


class TestAirflowKubernetesScheduler:
@staticmethod
Expand Down Expand Up @@ -100,17 +96,12 @@ def _is_safe_label_value(value):
regex = r"^[^a-z0-9A-Z]*|[^a-zA-Z0-9_\-\.]|[^a-z0-9A-Z]*$"
return len(value) <= 63 and re.match(regex, value)

@pytest.mark.skipif(
AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed"
)
def test_create_pod_id(self):
for dag_id, task_id in self._cases():
pod_name = PodGenerator.make_unique_pod_id(create_pod_id(dag_id, task_id))
assert self._is_valid_pod_id(pod_name)
with pytest.warns(AirflowProviderDeprecationWarning, match=r"deprecated\. Use `add_pod_suffix`"):
pod_name = PodGenerator.make_unique_pod_id(create_unique_id(dag_id, task_id))
assert self._is_valid_pod_id(pod_name), f"dag_id={dag_id!r}, task_id={task_id!r}"

@pytest.mark.skipif(
AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed"
)
@mock.patch("airflow.providers.cncf.kubernetes.pod_generator.PodGenerator")
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubeConfig")
def test_get_base_pod_from_template(self, mock_kubeconfig, mock_generator, data_file):
Expand Down Expand Up @@ -144,14 +135,16 @@ def test_get_base_pod_from_template(self, mock_kubeconfig, mock_generator, data_

def test_make_safe_label_value(self):
for dag_id, task_id in self._cases():
case = f"dag_id={dag_id!r}, task_id={task_id!r}"
safe_dag_id = pod_generator.make_safe_label_value(dag_id)
assert self._is_safe_label_value(safe_dag_id)
assert self._is_safe_label_value(safe_dag_id), case
safe_task_id = pod_generator.make_safe_label_value(task_id)
assert self._is_safe_label_value(safe_task_id)
dag_id = "my_dag_id"
assert dag_id == pod_generator.make_safe_label_value(dag_id)
dag_id = "my_dag_id_" + "a" * 64
assert "my_dag_id_" + "a" * 43 + "-0ce114c45" == pod_generator.make_safe_label_value(dag_id)
assert self._is_safe_label_value(safe_task_id), case

dag_id = "my_dag_id"
assert dag_id == pod_generator.make_safe_label_value(dag_id)
dag_id = "my_dag_id_" + "a" * 64
assert "my_dag_id_" + "a" * 43 + "-0ce114c45" == pod_generator.make_safe_label_value(dag_id)

def test_execution_date_serialize_deserialize(self):
datetime_obj = datetime.now()
Expand Down
Loading