Skip to content

Commit

Permalink
[AIRFLOW-3126] Add option to specify additional K8s volumes
Browse files Browse the repository at this point in the history
  • Loading branch information
brandonwillard committed Jun 18, 2020
1 parent 48da4b4 commit 37fa206
Show file tree
Hide file tree
Showing 6 changed files with 319 additions and 0 deletions.
14 changes: 14 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1898,6 +1898,20 @@
type: string
example: ~
default: ""
- name: extra_volume_mounts
description: |
Extra volumes to be mounted in worker pods. Volumes are specified as
keys in a JSON object with nested JSON values specifying each volume's
options. Recognized options are `claim_name` or `secret_name`
(required), `mount_path` (required), `read_only` (boolean, default
null), `sub_path` (default null), `secret_key` (string, default null),
`secret_mode` (string, default null).
version_added: ~
type: string
example: >-
{{"secret_vol": {{"secret_name": "some-secret", "mount_path": "/dir1", "sub_path": "subpath1",
"secret_mode": "440"}}, "pvc": {{"claim_name": "some-pvc", "mount_path": "/dir2"}}}}
default: ""
- name: dags_volume_host
description: |
For DAGs mounted via a hostPath volume (mutually exclusive with volume claim and git-sync)
Expand Down
9 changes: 9 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,15 @@ logs_volume_subpath =
# A shared volume claim for the logs
logs_volume_claim =

# Extra volumes to be mounted in worker pods. Volumes are specified as
# keys in a JSON object with nested JSON values specifying each volume's
# options. Recognized options are `claim_name` or `secret_name`
# (required), `mount_path` (required), `read_only` (boolean, default
# null), `sub_path` (default null), `secret_key` (string, default null),
# `secret_mode` (string, default null).
# Example: extra_volume_mounts = {{{{"secret_vol": {{{{"secret_name": "some-secret", "mount_path": "/dir1", "sub_path": "subpath1", "secret_mode": "440"}}}}, "pvc": {{{{"claim_name": "some-pvc", "mount_path": "/dir2"}}}}}}}}
extra_volume_mounts =

# For DAGs mounted via a hostPath volume (mutually exclusive with volume claim and git-sync)
# Useful in local environment, discouraged in production
dags_volume_host =
Expand Down
75 changes: 75 additions & 0 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import json
import multiprocessing
import time
from json import JSONDecodeError
from queue import Empty, Queue # pylint: disable=unused-import
from typing import Any, Dict, Optional, Tuple, Union

Expand Down Expand Up @@ -169,6 +170,9 @@ def __init__(self): # pylint: disable=too-many-statements
self.dags_volume_subpath = conf.get(
self.kubernetes_section, 'dags_volume_subpath')

# This prop may optionally be set for the addition of extra volume mounts
self.extra_volume_mounts = self._parse_extra_volume_mounts()

# This prop may optionally be set for PV Claims and is used to locate logs
# on a SubPath
self.logs_volume_subpath = conf.get(
Expand Down Expand Up @@ -256,6 +260,25 @@ def _get_security_context_val(self, scontext: str) -> Union[str, int]:
else:
return int(val)

def _parse_extra_volume_mounts(self) -> Dict[str, Dict[str, Any]]:
res: Dict[str, Dict[str, Any]] = {}

extra_volume_mounts = conf.get(self.kubernetes_section, 'extra_volume_mounts')

if extra_volume_mounts: # pylint: disable=too-many-nested-blocks

try:
res = json.loads(extra_volume_mounts)
except JSONDecodeError as e:
raise AirflowConfigException(
'Error parsing config option'
' `extra_volume_mounts`: {}.'.format(e))

for pvc_name, pvc_settings in res.items():
validate_pvc_settings(pvc_name, pvc_settings)

return res

def _validate(self):
if self.pod_template_file:
return
Expand Down Expand Up @@ -283,6 +306,58 @@ def _validate(self):
# pylint: enable=too-many-boolean-expressions


def validate_pvc_settings(pvc_name: str, pvc_settings: Dict[str, Any]) -> None:
"""Validate additional persistent volume claim settings.
:raises: airflow.exceptions.AirflowConfigException
"""
if 'mount_path' not in pvc_settings:
raise AirflowConfigException(
'Missing `mount_path` in config option'
' `extra_volume_mounts`: {}.'.format(pvc_name))

pvc_settings.setdefault('sub_path', None)

read_only = pvc_settings.setdefault('read_only', None)
if not isinstance(read_only, (bool, type(None))):
raise AirflowConfigException(
'Value of `read_only` is not boolean in config option'
' `extra_volume_mounts`: {}.'.format(read_only))

secret_mode = pvc_settings.setdefault('secret_mode', None)
if secret_mode is not None:
try:
pvc_settings['secret_mode'] = int(secret_mode, 8)
pvc_settings['secret'] = True
except TypeError as e:
raise AirflowConfigException(
'Error converting `secret_mode` in config option'
' `extra_volume_mounts`: {}.'.format(e))

secret_key = pvc_settings.setdefault('secret_key', None)
if secret_key is not None:
pvc_settings['secret'] = True

if 'secret_key_path' not in pvc_settings:
raise AirflowConfigException(
'Missing `secret_key_path` in config option'
' `extra_volume_mounts`: {}.'.format(pvc_name))

claim_name = pvc_settings.setdefault('claim_name', None)
secret_name = pvc_settings.setdefault('secret_name', None)

if secret_name is None and \
(secret_key is not None or secret_mode is not None):
raise AirflowConfigException(
'Missing `secret_name` in config option'
' `extra_volume_mounts`: {}.'.format(read_only))
elif not (secret_name or claim_name):
raise AirflowConfigException(
'Missing `claim_name` or `secret_name` value'
' in config option'
' `extra_volume_mounts`: {}.'.format(read_only))


class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
"""Watches for Kubernetes jobs"""

Expand Down
32 changes: 32 additions & 0 deletions airflow/kubernetes/worker_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,13 @@ def _get_volume_mounts(self) -> List[k8s.V1VolumeMount]:
)
}

for vol_name, vol_mnt_args in self.kube_config.extra_volume_mounts.items():
volume_mounts[vol_name] = k8s.V1VolumeMount(
name=vol_name,
sub_path=vol_mnt_args['sub_path'],
mount_path=vol_mnt_args['mount_path'],
read_only=vol_mnt_args['read_only'])

if self.kube_config.dags_volume_subpath:
volume_mounts[self.dags_volume_name].sub_path = self.kube_config.dags_volume_subpath

Expand Down Expand Up @@ -331,6 +338,31 @@ def _construct_volume(name, claim, host) -> k8s.V1Volume:
)
}

for vol_name, vol_args in self.kube_config.extra_volume_mounts.items():

if vol_args['secret_name']:
pvc_volume = k8s.V1Volume(
name=vol_name,
secret=k8s.V1SecretVolumeSource(
secret_name=vol_args['secret_name']
)
)

if vol_args['secret_key']:
pvc_volume.secret.items = [
k8s.V1KeyToPath(
key=vol_args['secret_key'],
path=vol_args['secret_key_path'],
mode=vol_args['secret_mode']
)]
else:
pvc_volume.secret.default_mode = vol_args['secret_mode']
else:
pvc_volume = _construct_volume(
vol_name, vol_args['claim_name'], None)

volumes[vol_name] = pvc_volume

if self.kube_config.dags_in_image:
del volumes[self.dags_volume_name]

Expand Down
120 changes: 120 additions & 0 deletions tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from airflow.executors.kubernetes_executor import KubeConfig
from airflow.kubernetes import pod_generator
from airflow.kubernetes.pod_generator import PodGenerator
from airflow.exceptions import AirflowConfigException
from airflow.utils.state import State
except ImportError:
AirflowKubernetesScheduler = None # type: ignore
Expand Down Expand Up @@ -168,6 +169,125 @@ def test_kube_config_git_sync_run_as_user_not_present(self):
def test_kube_config_git_sync_run_as_user_empty_string(self):
self.assertEqual(KubeConfig().git_sync_run_as_user, '')

@conf_vars({
('kubernetes', 'dags_volume_subpath'): 'dags',
('kubernetes', 'logs_volume_subpath'): 'logs',
('kubernetes', 'dags_volume_claim'): 'dags',
('kubernetes', 'dags_folder'): 'dags',
('kubernetes', 'extra_volume_mounts'):
'{"pvc1": {"secret_name": "secret1", "mount_path": "/volume1",'
' "sub_path": "subpath1", "secret_mode": "440"}}',
})
def test_worker_extra_volume_mounts_single(self):
kube_config = KubeConfig()

assert len(kube_config.extra_volume_mounts) == 1
assert kube_config.extra_volume_mounts['pvc1'] == {
'claim_name': None,
'mount_path': '/volume1',
'sub_path': 'subpath1',
'read_only': None,
'secret_name': 'secret1',
'secret': True,
'secret_key': None,
'secret_mode': 0o440,
}

@conf_vars({
('kubernetes', 'dags_volume_subpath'): 'dags',
('kubernetes', 'logs_volume_subpath'): 'logs',
('kubernetes', 'dags_volume_claim'): 'dags',
('kubernetes', 'dags_folder'): 'dags',
('kubernetes', 'extra_volume_mounts'): '{[]}',
})
def test_worker_extra_volume_mounts_invalid_json(self):
with self.assertRaisesRegex(AirflowConfigException,
'Error parsing.*`extra_volume_mounts`.*'):
KubeConfig()

@conf_vars({
('kubernetes', 'dags_volume_subpath'): 'dags',
('kubernetes', 'logs_volume_subpath'): 'logs',
('kubernetes', 'dags_volume_claim'): 'dags',
('kubernetes', 'dags_folder'): 'dags',
('kubernetes', 'extra_volume_mounts'):
'{"pvc1": {"mount_path": "/volume1", "secret_mode": 1}}',
})
def test_worker_extra_volume_mounts_invalid_mode(self):
with self.assertRaisesRegex(AirflowConfigException,
'Error converting.*`extra_volume_mounts`.*'):
KubeConfig()

@conf_vars({
('kubernetes', 'dags_volume_subpath'): 'dags',
('kubernetes', 'logs_volume_subpath'): 'logs',
('kubernetes', 'dags_volume_claim'): 'dags',
('kubernetes', 'dags_folder'): 'dags',
('kubernetes', 'extra_volume_mounts'):
'{"pvc1": {"mount_path": "/volume1", "read_only": "no"}}',
})
def test_worker_extra_volume_mounts_invalid_read_only(self):
with self.assertRaisesRegex(
AirflowConfigException,
'Value of `read_only` is not boolean.*`extra_volume_mounts`.*'):
KubeConfig()

@conf_vars({
('kubernetes', 'dags_volume_subpath'): 'dags',
('kubernetes', 'logs_volume_subpath'): 'logs',
('kubernetes', 'dags_volume_claim'): 'dags',
('kubernetes', 'dags_folder'): 'dags',
('kubernetes', 'extra_volume_mounts'):
'{"pvc1": {"mount_path": "/volume1", "secret_key": "key"}}',
})
def test_worker_extra_volume_mounts_missing_key_path(self):
with self.assertRaisesRegex(
AirflowConfigException,
'Missing `secret_key_path`.*`extra_volume_mounts`.*'):
KubeConfig()

@conf_vars({
('kubernetes', 'dags_volume_subpath'): 'dags',
('kubernetes', 'logs_volume_subpath'): 'logs',
('kubernetes', 'dags_volume_claim'): 'dags',
('kubernetes', 'dags_folder'): 'dags',
('kubernetes', 'extra_volume_mounts'):
'{"pvc1": {"read_only": true}}',
})
def test_worker_extra_volume_mounts_missing_mount_path(self):
with self.assertRaisesRegex(
AirflowConfigException,
'Missing `mount_path`.*`extra_volume_mounts`.*'):
KubeConfig()

@conf_vars({
('kubernetes', 'dags_volume_subpath'): 'dags',
('kubernetes', 'logs_volume_subpath'): 'logs',
('kubernetes', 'dags_volume_claim'): 'dags',
('kubernetes', 'dags_folder'): 'dags',
('kubernetes', 'extra_volume_mounts'):
'{"pvc1": {"mount_path": "/volume1", "secret_mode": "443"}}',
})
def test_worker_extra_volume_mounts_missing_secret_name(self):
with self.assertRaisesRegex(
AirflowConfigException,
'Missing `secret_name`.*`extra_volume_mounts`.*'):
KubeConfig()

@conf_vars({
('kubernetes', 'dags_volume_subpath'): 'dags',
('kubernetes', 'logs_volume_subpath'): 'logs',
('kubernetes', 'dags_volume_claim'): 'dags',
('kubernetes', 'dags_folder'): 'dags',
('kubernetes', 'extra_volume_mounts'):
'{"pvc1": {"mount_path": "/volume1"}}',
})
def test_worker_extra_volume_mounts_missing_name(self):
with self.assertRaisesRegex(
AirflowConfigException,
'Missing `claim_name` or `secret_name`.*`extra_volume_mounts`.*'):
KubeConfig()


class TestKubernetesExecutor(unittest.TestCase):
"""
Expand Down
69 changes: 69 additions & 0 deletions tests/kubernetes/test_worker_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,75 @@ def test_delete_option_kwargs_config(self, config, expected_value):
}):
self.assertEqual(KubeConfig().delete_option_kwargs, expected_value)

@conf_vars({
('kubernetes', 'dags_volume_subpath'): 'dags',
('kubernetes', 'logs_volume_subpath'): 'logs',
('kubernetes', 'dags_volume_claim'): 'dags',
('kubernetes', 'dags_folder'): 'dags',
('kubernetes', 'extra_volume_mounts'):
'{"pvc1": {"claim_name": "c1", "mount_path": "/volume1", "sub_path": "subpath1"},'
' "pvc2": {"claim_name": "c2", "mount_path": "/volume2", "read_only": true},'
' "pvc3": {"secret_name": "s1", "mount_path": "/volume3", "secret": true},'
' "pvc4": {"secret_name": "s2", "mount_path": "/volume4", "secret_mode": "440"},'
' "pvc5": {"secret_name": "s3", "mount_path": "/volume5", "secret_mode": "440",'
' "secret_key": "key", "secret_key_path": "path"}}',
})
def test_worker_extra_volume_mounts(self):
kube_config = KubeConfig()

assert kube_config.extra_volume_mounts

worker_config = WorkerConfiguration(kube_config)
volume_mounts = worker_config._get_volume_mounts()
volume_mounts = {v.name: v for v in volume_mounts}

assert 'pvc1' in volume_mounts
assert 'pvc2' in volume_mounts
assert 'pvc3' in volume_mounts
assert 'pvc4' in volume_mounts
assert volume_mounts['pvc1'].mount_path == '/volume1'
assert volume_mounts['pvc1'].read_only is None
assert volume_mounts['pvc1'].sub_path == 'subpath1'
assert volume_mounts['pvc2'].mount_path == '/volume2'
assert volume_mounts['pvc2'].read_only is True
assert volume_mounts['pvc2'].sub_path is None
assert volume_mounts['pvc3'].mount_path == '/volume3'
assert volume_mounts['pvc3'].read_only is None
assert volume_mounts['pvc3'].sub_path is None
assert volume_mounts['pvc4'].mount_path == '/volume4'
assert volume_mounts['pvc4'].read_only is None
assert volume_mounts['pvc4'].sub_path is None
assert volume_mounts['pvc5'].mount_path == '/volume5'
assert volume_mounts['pvc5'].read_only is None
assert volume_mounts['pvc5'].sub_path is None

volumes = {v.name: v for v in worker_config._get_volumes()}
assert 'pvc1' in volumes
assert 'pvc2' in volumes
assert 'pvc3' in volumes
assert 'pvc4' in volumes
assert 'pvc5' in volumes
assert volumes['pvc1'].persistent_volume_claim.claim_name == 'c1'
assert volumes['pvc2'].persistent_volume_claim.claim_name == 'c2'

assert volumes['pvc3'].persistent_volume_claim is None
assert volumes['pvc3'].secret.secret_name == 's1'
assert volumes['pvc3'].secret.default_mode is None
assert volumes['pvc3'].secret.items is None

assert volumes['pvc4'].persistent_volume_claim is None
assert volumes['pvc4'].secret.secret_name == 's2'
assert volumes['pvc4'].secret.default_mode == 0o440
assert volumes['pvc4'].secret.items is None

assert volumes['pvc5'].persistent_volume_claim is None
assert volumes['pvc5'].secret.secret_name == 's3'
assert volumes['pvc5'].secret.default_mode is None
assert len(volumes['pvc5'].secret.items) == 1
assert volumes['pvc5'].secret.items[0].key == 'key'
assert volumes['pvc5'].secret.items[0].path == 'path'
assert volumes['pvc5'].secret.items[0].mode == 0o440

def test_worker_with_subpaths(self):
self.kube_config.dags_volume_subpath = 'dags'
self.kube_config.logs_volume_subpath = 'logs'
Expand Down

0 comments on commit 37fa206

Please sign in to comment.