Skip to content

Commit

Permalink
K8s hook should still work with missing default conn (#31187)
Browse files Browse the repository at this point in the history
It's common when using KPO / K8s hook to want to simply use the RBAC of the cluster.  This was the default behavior prior to #28848.  After that change, users are forced to add a k8s connection or their dags will break.

To fix this, in the special case where conn_id=="kubernetes_default", if the conn is missing, we ignore the failure.
  • Loading branch information
dstandish authored May 10, 2023
1 parent 584a9f5 commit 8eab2e5
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 1 deletion.
8 changes: 8 additions & 0 deletions airflow/providers/cncf/kubernetes/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ Features
* ``Deprecate 'skip_exit_code' in 'DockerOperator' and 'KubernetesPodOperator' (#30733)``
* ``Remove skip_exit_code from KubernetesPodOperator (#30788)``
6.1.0
.....

Features
~~~~~~~~

If ``kubernetes_default`` connection is not defined, then KubernetesHook / KubernetesPodOperator will behave as though given ``conn_id=None``.

6.0.0
.....

Expand Down
19 changes: 18 additions & 1 deletion airflow/providers/cncf/kubernetes/hooks/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@
from urllib3.exceptions import HTTPError

from airflow.compat.functools import cached_property
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.exceptions import AirflowException, AirflowNotFoundException, AirflowProviderDeprecationWarning
from airflow.hooks.base import BaseHook
from airflow.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive
from airflow.models import Connection
from airflow.utils import yaml

LOADING_KUBE_CONFIG_FILE_RESOURCE = "Loading Kubernetes configuration file kube_config from {}..."
Expand Down Expand Up @@ -141,6 +142,22 @@ def _coalesce_param(*params):
if param is not None:
return param

@classmethod
def get_connection(cls, conn_id: str) -> Connection:
"""
Return requested connection.
If missing and conn_id is "kubernetes_default", will return empty connection so that hook will
default to cluster-derived credentials.
"""
try:
return super().get_connection(conn_id)
except AirflowNotFoundException:
if conn_id == cls.default_conn_name:
return Connection(conn_id=cls.default_conn_name)
else:
raise

@cached_property
def conn_extras(self):
if self.conn_id:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
import kubernetes
import pytest
from kubernetes.config import ConfigException
from sqlalchemy.orm import make_transient

from airflow import AirflowException
from airflow.exceptions import AirflowNotFoundException
from airflow.hooks.base import BaseHook
from airflow.models import Connection
from airflow.providers.cncf.kubernetes.hooks.kubernetes import AsyncKubernetesHook, KubernetesHook
from airflow.utils import db
Expand All @@ -54,6 +57,22 @@ class DeprecationRemovalRequired(AirflowException):
...


DEFAULT_CONN_ID = "kubernetes_default"


@pytest.fixture()
def remove_default_conn(session):
before_conn = session.query(Connection).filter(Connection.conn_id == DEFAULT_CONN_ID).one_or_none()
if before_conn:
session.delete(before_conn)
session.commit()
yield
if before_conn:
make_transient(before_conn)
session.add(before_conn)
session.commit()


class TestKubernetesHook:
@classmethod
def setup_class(cls) -> None:
Expand Down Expand Up @@ -391,6 +410,20 @@ def test_prefixed_names_still_work(self, mock_get_client):
mock_get_client.assert_called_with(cluster_context="test")
assert kubernetes_hook.get_namespace() == "test"

def test_missing_default_connection_is_ok(self, remove_default_conn):
# prove to ourselves that the default conn doesn't exist
with pytest.raises(AirflowNotFoundException):
BaseHook.get_connection(DEFAULT_CONN_ID)

# verify K8sHook still works
hook = KubernetesHook()
assert hook.conn_extras == {}

# meanwhile, asking for non-default should still fail if it doesn't exist
hook = KubernetesHook("some_conn")
with pytest.raises(AirflowNotFoundException, match="The conn_id `some_conn` isn't defined"):
hook.conn_extras

@patch("kubernetes.config.kube_config.KubeConfigLoader")
@patch("kubernetes.config.kube_config.KubeConfigMerger")
@patch(f"{HOOK_MODULE}.client.CustomObjectsApi")
Expand Down

0 comments on commit 8eab2e5

Please sign in to comment.