Skip to content

Commit

Permalink
[#504] Airflow DAGs unable to read credentials from k8s secret (#515)
Browse files Browse the repository at this point in the history
  • Loading branch information
allesh-clmb authored and Dmitrii Suslov committed Oct 23, 2018
1 parent aa027c9 commit b737f4b
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 6 deletions.
17 changes: 17 additions & 0 deletions deploy/helms/airflow/templates/credentials.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
apiVersion: v1
kind: Secret
metadata:
name: airflow-credentials
type: Opaque
data:
{{- range .Values.connections }}
{{ $_ := set . "conn_id" .connection_id }}
{{ $_ := unset . "connection_id" }}
{{ $_ := set . "conn_type" .connection_type }}
{{ $_ := unset . "connection_type" }}
{{ if hasKey . "extra" }}
{{ $extra := .extra | toJson }}
{{ $_ := set . "extra" $extra }}
{{ end }}
{{ .conn_id }}: {{ . | toYaml | b64enc }}
{{- end }}
2 changes: 1 addition & 1 deletion deploy/helms/airflow/templates/worker-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ rules:
- apiGroups: [""] # core API group
resources: ["secrets"]
verbs: ["watch", "get"]
resourceNames: ['airflow-credentials-*']
resourceNames: ['airflow-credentials']
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1beta1
Expand Down
15 changes: 10 additions & 5 deletions legion_airflow/legion_airflow/hooks/k8s_base_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
K8SBaseHook module
"""
import os
import yaml
from airflow.hooks.base_hook import BaseHook
from airflow.models import Connection
from airflow.utils.log.logging_mixin import LoggingMixin
Expand All @@ -30,7 +31,7 @@ class K8SBaseHook(BaseHook):
upon failure to retrieve from k8s.
"""

STORAGE_NAME_PREFIX = 'airflow-credentials-'
STORAGE_NAME_PREFIX = 'airflow-credentials'

@classmethod
def get_connection(cls, conn_id):
Expand All @@ -44,8 +45,8 @@ def get_connection(cls, conn_id):
return cls._get_conn_from_k8s(conn_id)
except Exception as e:
LoggingMixin().log.warning(
'Failed to retrieve connection {} from k8s secret '
'retrieving from env/db'.format(conn_id),
'Failed to retrieve connection {} from k8s secret. The error message is {} '
'retrieving from env/db'.format(conn_id, e),
exc_info=True, stack_info=True
)
return super().get_connection(conn_id)
Expand All @@ -59,7 +60,11 @@ def _get_conn_from_k8s(cls, conn_id):
:type conn_id: str
"""
config_map = K8SSecretStorage.retrive(
storage_name=cls.STORAGE_NAME_PREFIX + conn_id,
storage_name=cls.STORAGE_NAME_PREFIX,
k8s_namespace=os.environ['NAMESPACE']
)
return Connection(**config_map.data)
if conn_id not in config_map.data:
raise Exception("Doesn't have {} value in k8s secret".format(conn_id))

connection_data = yaml.load(config_map.data[conn_id])
return Connection(**connection_data)

0 comments on commit b737f4b

Please sign in to comment.