Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class KubernetesHook(BaseHook, PodOperatorHookProtocol):
:param cluster_context: Optionally specify a context to use (e.g. if you have multiple
in your kubeconfig.
:param config_file: Path to kubeconfig file.
:param config_dict: Takes the config file as a dict.
:param in_cluster: Set to ``True`` if running from within a kubernetes cluster.
:param disable_verify_ssl: Set to ``True`` if SSL verification should be disabled.
:param disable_tcp_keepalive: Set to ``True`` if you want to disable keepalive logic.
Expand Down Expand Up @@ -145,6 +146,7 @@ def __init__(
client_configuration: client.Configuration | None = None,
cluster_context: str | None = None,
config_file: str | None = None,
config_dict: dict | None = None,
in_cluster: bool | None = None,
disable_verify_ssl: bool | None = None,
disable_tcp_keepalive: bool | None = None,
Expand All @@ -154,6 +156,7 @@ def __init__(
self.client_configuration = client_configuration
self.cluster_context = cluster_context
self.config_file = config_file
self.config_dict = config_dict
self.in_cluster = in_cluster
self.disable_verify_ssl = disable_verify_ssl
self.disable_tcp_keepalive = disable_tcp_keepalive
Expand Down Expand Up @@ -213,12 +216,14 @@ def get_conn(self) -> client.ApiClient:
cluster_context = self._coalesce_param(self.cluster_context, self._get_field("cluster_context"))
kubeconfig_path = self._coalesce_param(self.config_file, self._get_field("kube_config_path"))
kubeconfig = self._get_field("kube_config")
num_selected_configuration = sum(1 for o in [in_cluster, kubeconfig, kubeconfig_path] if o)
num_selected_configuration = sum(
1 for o in [in_cluster, kubeconfig, kubeconfig_path, self.config_dict] if o
)

if num_selected_configuration > 1:
raise AirflowException(
"Invalid connection configuration. Options kube_config_path, "
"kube_config, in_cluster are mutually exclusive. "
"kube_config, in_cluster, config_dict are mutually exclusive. "
"You can only use one option at a time."
)

Expand Down Expand Up @@ -265,6 +270,16 @@ def get_conn(self) -> client.ApiClient:
)
return client.ApiClient()

if self.config_dict:
self.log.debug(LOADING_KUBE_CONFIG_FILE_RESOURCE.format("config dictionary"))
self._is_in_cluster = False
config.load_kube_config_from_dict(
config_dict=self.config_dict,
client_configuration=self.client_configuration,
context=cluster_context,
)
return client.ApiClient()

return self._get_default_client(cluster_context=cluster_context)

def _get_default_client(self, *, cluster_context: str | None = None) -> client.ApiClient:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,22 @@ def test_get_yaml_content_from_file_error(self, mock_requests, mock_yaml):
mock_get.assert_called_with(YAML_URL, allow_redirects=True)
mock_yaml.safe_load_all.assert_not_called()

@patch("kubernetes.config.kube_config.KubeConfigLoader")
@patch("kubernetes.config.incluster_config.InClusterConfigLoader")
@patch("kubernetes.config.kube_config.KubeConfigMerger")
def test_load_config_with_config_dict(self, kube_config_merger, incluster_config, kube_config_loader):
hook = KubernetesHook(
conn_id=None,
in_cluster=False,
config_dict={"a": "b"},
cluster_context=None,
)
api_conn = hook.get_conn()
assert not incluster_config.called
assert hook._is_in_cluster is False
kube_config_loader.assert_called_once()
assert isinstance(api_conn, kubernetes.client.api_client.ApiClient)


class TestKubernetesHookIncorrectConfiguration:
@pytest.mark.parametrize(
Expand Down