diff --git a/deploy/helms/airflow/templates/credentials.yaml b/deploy/helms/airflow/templates/credentials.yaml new file mode 100644 index 000000000..8387afa62 --- /dev/null +++ b/deploy/helms/airflow/templates/credentials.yaml @@ -0,0 +1,17 @@ +apiVersion: v1 +kind: Secret +metadata: + name: airflow-credentials +type: Opaque +data: +{{- range .Values.connections }} + {{ $_ := set . "conn_id" .connection_id }} + {{ $_ := unset . "connection_id" }} + {{ $_ := set . "conn_type" .connection_type }} + {{ $_ := unset . "connection_type" }} + {{ if hasKey . "extra" }} + {{ $extra := .extra | toJson }} + {{ $_ := set . "extra" $extra }} + {{ end }} + {{ .conn_id }}: {{ . | toYaml | b64enc }} +{{- end }} diff --git a/deploy/helms/airflow/templates/worker-rbac.yaml b/deploy/helms/airflow/templates/worker-rbac.yaml index 4419fbfbc..5c9dbbf33 100644 --- a/deploy/helms/airflow/templates/worker-rbac.yaml +++ b/deploy/helms/airflow/templates/worker-rbac.yaml @@ -9,7 +9,7 @@ rules: - apiGroups: [""] # core API group resources: ["secrets"] verbs: ["watch", "get"] - resourceNames: ['airflow-credentials-*'] + resourceNames: ['airflow-credentials'] --- kind: RoleBinding apiVersion: rbac.authorization.k8s.io/v1beta1 diff --git a/legion_airflow/legion_airflow/hooks/k8s_base_hook.py b/legion_airflow/legion_airflow/hooks/k8s_base_hook.py index 26f582dcc..5629830c6 100644 --- a/legion_airflow/legion_airflow/hooks/k8s_base_hook.py +++ b/legion_airflow/legion_airflow/hooks/k8s_base_hook.py @@ -17,6 +17,7 @@ K8SBaseHook module """ import os +import yaml from airflow.hooks.base_hook import BaseHook from airflow.models import Connection from airflow.utils.log.logging_mixin import LoggingMixin @@ -30,7 +31,7 @@ class K8SBaseHook(BaseHook): upon failure to retrieve from k8s. """ - STORAGE_NAME_PREFIX = 'airflow-credentials-' + STORAGE_NAME_PREFIX = 'airflow-credentials' @classmethod def get_connection(cls, conn_id): @@ -44,8 +45,8 @@ def get_connection(cls, conn_id): return cls._get_conn_from_k8s(conn_id) except Exception as e: LoggingMixin().log.warning( - 'Failed to retrieve connection {} from k8s secret ' - 'retrieving from env/db'.format(conn_id), + 'Failed to retrieve connection {} from k8s secret. The error message is {} ' + 'retrieving from env/db'.format(conn_id, e), exc_info=True, stack_info=True ) return super().get_connection(conn_id) @@ -59,7 +60,11 @@ def _get_conn_from_k8s(cls, conn_id): :type conn_id: str """ config_map = K8SSecretStorage.retrive( - storage_name=cls.STORAGE_NAME_PREFIX + conn_id, + storage_name=cls.STORAGE_NAME_PREFIX, k8s_namespace=os.environ['NAMESPACE'] ) - return Connection(**config_map.data) + if conn_id not in config_map.data: + raise Exception("Doesn't have {} value in k8s secret".format(conn_id)) + + connection_data = yaml.load(config_map.data[conn_id]) + return Connection(**connection_data)